feoxdb/core/
cache.rs

1use crate::constants::*;
2use crate::stats::Statistics;
3use crate::utils::hash::murmur3_32;
4use bytes::Bytes;
5use parking_lot::{Mutex, RwLock};
6use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
7use std::sync::Arc;
8
9/// CLOCK algorithm cache implementation
10/// Uses reference bits and circular scanning for eviction
11pub struct ClockCache {
12    /// Cache entries organized in buckets for better locality
13    buckets: Vec<RwLock<Vec<CacheEntry>>>,
14
15    /// Global CLOCK hand position for eviction scanning
16    clock_hand: AtomicUsize,
17
18    /// High watermark for triggering eviction (bytes)
19    high_watermark: AtomicUsize,
20
21    /// Low watermark to evict down to (bytes)
22    low_watermark: AtomicUsize,
23
24    /// Lock for eviction process
25    eviction_lock: Mutex<()>,
26
27    /// Shared statistics
28    stats: Arc<Statistics>,
29}
30
31#[derive(Clone)]
32struct CacheEntry {
33    key: Vec<u8>,
34    value: Bytes,
35
36    /// Reference bit for CLOCK algorithm (accessed recently)
37    reference_bit: Arc<AtomicBool>,
38
39    /// Size of this entry in bytes
40    size: usize,
41
42    /// Access count for statistics
43    access_count: Arc<AtomicU32>,
44}
45
46impl ClockCache {
47    pub fn new(stats: Arc<Statistics>) -> Self {
48        let buckets = (0..CACHE_BUCKETS)
49            .map(|_| RwLock::new(Vec::new()))
50            .collect();
51
52        Self {
53            buckets,
54            clock_hand: AtomicUsize::new(0),
55            high_watermark: AtomicUsize::new(CACHE_HIGH_WATERMARK_MB * MB),
56            low_watermark: AtomicUsize::new(CACHE_LOW_WATERMARK_MB * MB),
57            eviction_lock: Mutex::new(()),
58            stats,
59        }
60    }
61
62    /// Get value from cache, setting reference bit on access
63    pub fn get(&self, key: &[u8]) -> Option<Bytes> {
64        let hash = murmur3_32(key, 0);
65        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
66
67        let bucket = self.buckets[bucket_idx].read();
68
69        for entry in bucket.iter() {
70            if entry.key == key {
71                // Set reference bit on access (CLOCK algorithm)
72                entry.reference_bit.store(true, Ordering::Release);
73                entry.access_count.fetch_add(1, Ordering::Relaxed);
74                return Some(entry.value.clone());
75            }
76        }
77
78        None
79    }
80
81    /// Insert value into cache, triggering eviction if needed
82    pub fn insert(&self, key: Vec<u8>, value: Bytes) {
83        let size = key.len() + value.len() + std::mem::size_of::<CacheEntry>();
84
85        // Don't cache very large values
86        let high_watermark = self.high_watermark.load(Ordering::Acquire);
87        if size > high_watermark / 4 {
88            return;
89        }
90
91        // Check if we need to evict before inserting
92        let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
93        let high_watermark = self.high_watermark.load(Ordering::Acquire);
94        if current_usage + size > high_watermark {
95            self.evict_entries();
96        }
97
98        let hash = murmur3_32(&key, 0);
99        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
100
101        let mut bucket = self.buckets[bucket_idx].write();
102
103        // Check if key already exists and update
104        for entry in bucket.iter_mut() {
105            if entry.key == key {
106                let old_size = entry.size;
107                entry.value = value;
108                entry.size = size;
109                entry.reference_bit.store(true, Ordering::Release);
110
111                // Update memory usage
112                if size > old_size {
113                    self.stats
114                        .cache_memory
115                        .fetch_add(size - old_size, Ordering::AcqRel);
116                } else {
117                    self.stats
118                        .cache_memory
119                        .fetch_sub(old_size - size, Ordering::AcqRel);
120                }
121                return;
122            }
123        }
124
125        // Add new entry
126        let entry = CacheEntry {
127            key,
128            value,
129            reference_bit: Arc::new(AtomicBool::new(true)),
130            size,
131            access_count: Arc::new(AtomicU32::new(1)),
132        };
133
134        bucket.push(entry);
135        self.stats.cache_memory.fetch_add(size, Ordering::AcqRel);
136    }
137
138    /// Remove specific key from cache
139    pub fn remove(&self, key: &[u8]) {
140        let hash = murmur3_32(key, 0);
141        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
142
143        let mut bucket = self.buckets[bucket_idx].write();
144
145        if let Some(pos) = bucket.iter().position(|e| e.key == key) {
146            let entry = bucket.remove(pos);
147            self.stats
148                .cache_memory
149                .fetch_sub(entry.size, Ordering::AcqRel);
150        }
151    }
152
153    /// CLOCK algorithm eviction - scan entries circularly, evicting those without reference bit
154    pub fn evict_entries(&self) {
155        // Try to acquire eviction lock, return if already evicting
156        let _lock = match self.eviction_lock.try_lock() {
157            Some(lock) => lock,
158            None => return,
159        };
160
161        let target_usage = self.low_watermark.load(Ordering::Acquire);
162        let mut current_usage = self.stats.cache_memory.load(Ordering::Acquire);
163
164        if current_usage <= target_usage {
165            return;
166        }
167
168        let mut scans = 0;
169        const MAX_SCANS: usize = 3; // Maximum passes through cache
170
171        while current_usage > target_usage && scans < MAX_SCANS {
172            let mut entries_checked = 0;
173            let mut bucket_count = 0;
174            for bucket in &self.buckets {
175                bucket_count += bucket.read().len();
176            }
177            let total_entries = bucket_count;
178
179            // Scan through buckets using CLOCK hand
180            while entries_checked < total_entries && current_usage > target_usage {
181                let hand = self.clock_hand.fetch_add(1, Ordering::AcqRel) % CACHE_BUCKETS;
182
183                let mut bucket = self.buckets[hand].write();
184                let mut i = 0;
185
186                while i < bucket.len() {
187                    let entry = &bucket[i];
188
189                    // Check reference bit
190                    if entry.reference_bit.load(Ordering::Acquire) {
191                        // Clear reference bit and give second chance with barrier
192                        entry.reference_bit.store(false, Ordering::Release);
193                        std::sync::atomic::fence(Ordering::Release);
194                        i += 1;
195                    } else {
196                        // No reference bit - evict this entry
197                        let removed = bucket.remove(i);
198                        self.stats
199                            .cache_memory
200                            .fetch_sub(removed.size, Ordering::AcqRel);
201                        self.stats.record_eviction(1);
202                        current_usage -= removed.size;
203                        // Don't increment i since we removed an element
204                    }
205
206                    entries_checked += 1;
207
208                    if current_usage <= target_usage {
209                        break;
210                    }
211                }
212            }
213
214            scans += 1;
215        }
216    }
217
218    /// Clear all cache entries
219    pub fn clear(&self) {
220        for bucket in &self.buckets {
221            bucket.write().clear();
222        }
223
224        self.stats.cache_memory.store(0, Ordering::Release);
225        self.clock_hand.store(0, Ordering::Release);
226    }
227
228    /// Get current cache statistics
229    pub fn stats(&self) -> CacheStats {
230        CacheStats {
231            entries: 0, // Calculate from buckets if needed
232            memory_usage: self.stats.cache_memory.load(Ordering::Acquire),
233            high_watermark: self.high_watermark.load(Ordering::Acquire),
234            low_watermark: self.low_watermark.load(Ordering::Acquire),
235        }
236    }
237
238    /// Adjust cache watermarks dynamically
239    pub fn adjust_watermarks(&self, high_mb: usize, low_mb: usize) {
240        let high = high_mb * MB;
241        let low = low_mb * MB;
242
243        if high > low && high <= CACHE_MAX_SIZE {
244            // Max 1GB for cache
245            // Update watermarks atomically
246            self.high_watermark.store(high, Ordering::Release);
247            self.low_watermark.store(low, Ordering::Release);
248
249            // Trigger eviction if we're over the new high watermark
250            let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
251            if current_usage > high {
252                self.evict_entries();
253            }
254        }
255    }
256}
257
258#[derive(Debug, Clone)]
259pub struct CacheStats {
260    pub entries: u32,
261    pub memory_usage: usize,
262    pub high_watermark: usize,
263    pub low_watermark: usize,
264}