feoxdb/core/
cache.rs

1use crate::constants::*;
2use crate::stats::Statistics;
3use crate::utils::hash::murmur3_32;
4use bytes::Bytes;
5use parking_lot::{Mutex, RwLock};
6use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
7use std::sync::Arc;
8
9/// CLOCK algorithm cache implementation
10/// Uses reference bits and circular scanning for eviction
11pub struct ClockCache {
12    /// Cache entries organized in buckets for better locality
13    buckets: Vec<RwLock<Vec<CacheEntry>>>,
14
15    /// Global CLOCK hand position for eviction scanning
16    clock_hand: AtomicUsize,
17
18    /// High watermark for triggering eviction (bytes)
19    high_watermark: AtomicUsize,
20
21    /// Low watermark to evict down to (bytes)
22    low_watermark: AtomicUsize,
23
24    /// Lock for eviction process
25    eviction_lock: Mutex<()>,
26
27    /// Shared statistics
28    stats: Arc<Statistics>,
29}
30
31#[derive(Clone)]
32struct CacheEntry {
33    key: Vec<u8>,
34    value: Bytes,
35
36    /// Reference bit for CLOCK algorithm (accessed recently)
37    reference_bit: Arc<AtomicBool>,
38
39    /// Size of this entry in bytes
40    size: usize,
41
42    /// Access count for statistics
43    access_count: Arc<AtomicU32>,
44}
45
46impl ClockCache {
47    pub fn new(stats: Arc<Statistics>) -> Self {
48        let buckets = (0..CACHE_BUCKETS)
49            .map(|_| RwLock::new(Vec::new()))
50            .collect();
51
52        Self {
53            buckets,
54            clock_hand: AtomicUsize::new(0),
55            high_watermark: AtomicUsize::new(CACHE_HIGH_WATERMARK_MB * MB),
56            low_watermark: AtomicUsize::new(CACHE_LOW_WATERMARK_MB * MB),
57            eviction_lock: Mutex::new(()),
58            stats,
59        }
60    }
61
62    /// Get value from cache, setting reference bit on access
63    pub fn get(&self, key: &[u8]) -> Option<Bytes> {
64        let hash = murmur3_32(key, 0);
65        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
66
67        let bucket = self.buckets[bucket_idx].read();
68
69        for entry in bucket.iter() {
70            if entry.key == key {
71                // Set reference bit on access (CLOCK algorithm)
72                entry.reference_bit.store(true, Ordering::Release);
73                entry.access_count.fetch_add(1, Ordering::Relaxed);
74                return Some(entry.value.clone());
75            }
76        }
77
78        None
79    }
80
81    /// Insert value into cache, triggering eviction if needed
82    pub fn insert(&self, key: Vec<u8>, value: Bytes) {
83        let size = key.len() + value.len() + std::mem::size_of::<CacheEntry>();
84
85        // Don't cache very large values
86        let high_watermark = self.high_watermark.load(Ordering::Acquire);
87        if size > high_watermark / 4 {
88            return;
89        }
90
91        // Check if we need to evict before inserting
92        let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
93        let high_watermark = self.high_watermark.load(Ordering::Acquire);
94        if current_usage + size > high_watermark {
95            self.evict_entries();
96        }
97
98        let hash = murmur3_32(&key, 0);
99        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
100
101        let mut bucket = self.buckets[bucket_idx].write();
102
103        // Check if key already exists and update
104        for entry in bucket.iter_mut() {
105            if entry.key == key {
106                let old_size = entry.size;
107                entry.value = value;
108                entry.size = size;
109                entry.reference_bit.store(true, Ordering::Release);
110
111                // Update memory usage
112                if size > old_size {
113                    self.stats
114                        .cache_memory
115                        .fetch_add(size - old_size, Ordering::AcqRel);
116                } else {
117                    self.stats
118                        .cache_memory
119                        .fetch_sub(old_size - size, Ordering::AcqRel);
120                }
121                return;
122            }
123        }
124
125        // Add new entry
126        let entry = CacheEntry {
127            key,
128            value,
129            reference_bit: Arc::new(AtomicBool::new(true)),
130            size,
131            access_count: Arc::new(AtomicU32::new(1)),
132        };
133
134        bucket.push(entry);
135        self.stats.cache_memory.fetch_add(size, Ordering::AcqRel);
136    }
137
138    /// Remove specific key from cache
139    pub fn remove(&self, key: &[u8]) {
140        let hash = murmur3_32(key, 0);
141        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
142
143        let mut bucket = self.buckets[bucket_idx].write();
144
145        if let Some(pos) = bucket.iter().position(|e| e.key == key) {
146            let entry = bucket.remove(pos);
147            self.stats
148                .cache_memory
149                .fetch_sub(entry.size, Ordering::AcqRel);
150        }
151    }
152
153    /// CLOCK algorithm eviction - scan entries circularly, evicting those without reference bit
154    pub fn evict_entries(&self) {
155        // Try to acquire eviction lock, return if already evicting
156        let _lock = match self.eviction_lock.try_lock() {
157            Some(lock) => lock,
158            None => return,
159        };
160
161        let target_usage = self.low_watermark.load(Ordering::Acquire);
162        let mut current_usage = self.stats.cache_memory.load(Ordering::Acquire);
163
164        if current_usage <= target_usage {
165            return;
166        }
167
168        let mut evicted_bytes = 0;
169        let mut scans = 0;
170        const MAX_SCANS: usize = 3; // Maximum passes through cache
171
172        while current_usage > target_usage && scans < MAX_SCANS {
173            let mut entries_checked = 0;
174            let mut bucket_count = 0;
175            for bucket in &self.buckets {
176                bucket_count += bucket.read().len();
177            }
178            let total_entries = bucket_count;
179
180            // Scan through buckets using CLOCK hand
181            while entries_checked < total_entries && current_usage > target_usage {
182                let hand = self.clock_hand.fetch_add(1, Ordering::AcqRel) % CACHE_BUCKETS;
183
184                let mut bucket = self.buckets[hand].write();
185                let mut i = 0;
186
187                while i < bucket.len() {
188                    let entry = &bucket[i];
189
190                    // Check reference bit
191                    if entry.reference_bit.load(Ordering::Acquire) {
192                        // Clear reference bit and give second chance with barrier
193                        entry.reference_bit.store(false, Ordering::Release);
194                        std::sync::atomic::fence(Ordering::Release);
195                        i += 1;
196                    } else {
197                        // No reference bit - evict this entry
198                        let removed = bucket.remove(i);
199                        evicted_bytes += removed.size;
200                        self.stats
201                            .cache_memory
202                            .fetch_sub(removed.size, Ordering::AcqRel);
203                        self.stats.record_eviction(1);
204                        current_usage -= removed.size;
205                        // Don't increment i since we removed an element
206                    }
207
208                    entries_checked += 1;
209
210                    if current_usage <= target_usage {
211                        break;
212                    }
213                }
214            }
215
216            scans += 1;
217        }
218
219        if evicted_bytes > 0 {
220            // Log eviction statistics
221            #[cfg(debug_assertions)]
222            eprintln!(
223                "Cache eviction: freed {} bytes in {} scans, usage now: {} bytes",
224                evicted_bytes, scans, current_usage
225            );
226        }
227    }
228
229    /// Clear all cache entries
230    pub fn clear(&self) {
231        for bucket in &self.buckets {
232            bucket.write().clear();
233        }
234
235        self.stats.cache_memory.store(0, Ordering::Release);
236        self.clock_hand.store(0, Ordering::Release);
237    }
238
239    /// Get current cache statistics
240    pub fn stats(&self) -> CacheStats {
241        CacheStats {
242            entries: 0, // Calculate from buckets if needed
243            memory_usage: self.stats.cache_memory.load(Ordering::Acquire),
244            high_watermark: self.high_watermark.load(Ordering::Acquire),
245            low_watermark: self.low_watermark.load(Ordering::Acquire),
246        }
247    }
248
249    /// Adjust cache watermarks dynamically
250    pub fn adjust_watermarks(&self, high_mb: usize, low_mb: usize) {
251        let high = high_mb * MB;
252        let low = low_mb * MB;
253
254        if high > low && high <= CACHE_MAX_SIZE {
255            // Max 1GB for cache
256            // Update watermarks atomically
257            self.high_watermark.store(high, Ordering::Release);
258            self.low_watermark.store(low, Ordering::Release);
259
260            // Trigger eviction if we're over the new high watermark
261            let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
262            if current_usage > high {
263                self.evict_entries();
264            }
265        }
266    }
267}
268
269#[derive(Debug, Clone)]
270pub struct CacheStats {
271    pub entries: u32,
272    pub memory_usage: usize,
273    pub high_watermark: usize,
274    pub low_watermark: usize,
275}