/home/runner/work/feoxdb/feoxdb/src/core/cache.rs
Line | Count | Source |
1 | | use crate::constants::*; |
2 | | use crate::stats::Statistics; |
3 | | use crate::utils::hash::murmur3_32; |
4 | | use bytes::Bytes; |
5 | | use parking_lot::{Mutex, RwLock}; |
6 | | use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; |
7 | | use std::sync::Arc; |
8 | | |
9 | | /// CLOCK algorithm cache implementation |
10 | | /// Uses reference bits and circular scanning for eviction |
11 | | pub struct ClockCache { |
12 | | /// Cache entries organized in buckets for better locality |
13 | | buckets: Vec<RwLock<Vec<CacheEntry>>>, |
14 | | |
15 | | /// Global CLOCK hand position for eviction scanning |
16 | | clock_hand: AtomicUsize, |
17 | | |
18 | | /// High watermark for triggering eviction (bytes) |
19 | | high_watermark: AtomicUsize, |
20 | | |
21 | | /// Low watermark to evict down to (bytes) |
22 | | low_watermark: AtomicUsize, |
23 | | |
24 | | /// Lock for eviction process |
25 | | eviction_lock: Mutex<()>, |
26 | | |
27 | | /// Shared statistics |
28 | | stats: Arc<Statistics>, |
29 | | } |
30 | | |
31 | | #[derive(Clone)] |
32 | | struct CacheEntry { |
33 | | key: Vec<u8>, |
34 | | value: Bytes, |
35 | | |
36 | | /// Reference bit for CLOCK algorithm (accessed recently) |
37 | | reference_bit: Arc<AtomicBool>, |
38 | | |
39 | | /// Size of this entry in bytes |
40 | | size: usize, |
41 | | |
42 | | /// Access count for statistics |
43 | | access_count: Arc<AtomicU32>, |
44 | | } |
45 | | |
46 | | impl ClockCache { |
47 | 26 | pub fn new(stats: Arc<Statistics>) -> Self { |
48 | 26 | let buckets = (0..CACHE_BUCKETS) |
49 | 425k | .map26 (|_| RwLock::new(Vec::new())) |
50 | 26 | .collect(); |
51 | | |
52 | 26 | Self { |
53 | 26 | buckets, |
54 | 26 | clock_hand: AtomicUsize::new(0), |
55 | 26 | high_watermark: AtomicUsize::new(CACHE_HIGH_WATERMARK_MB * MB), |
56 | 26 | low_watermark: AtomicUsize::new(CACHE_LOW_WATERMARK_MB * MB), |
57 | 26 | eviction_lock: Mutex::new(()), |
58 | 26 | stats, |
59 | 26 | } |
60 | 26 | } |
61 | | |
62 | | /// Get value from cache, setting reference bit on access |
63 | 1.06k | pub fn get(&self, key: &[u8]) -> Option<Bytes> { |
64 | 1.06k | let hash = murmur3_32(key, 0); |
65 | 1.06k | let bucket_idx = (hash as usize) % CACHE_BUCKETS; |
66 | | |
67 | 1.06k | let bucket = self.buckets[bucket_idx].read(); |
68 | | |
69 | 1.06k | for entry889 in bucket.iter() { |
70 | 889 | if entry.key == key { |
71 | | // Set reference bit on access (CLOCK algorithm) |
72 | 853 | entry.reference_bit.store(true, Ordering::Release); |
73 | 853 | entry.access_count.fetch_add(1, Ordering::Relaxed); |
74 | 853 | return Some(entry.value.clone()); |
75 | 36 | } |
76 | | } |
77 | | |
78 | 216 | None |
79 | 1.06k | } |
80 | | |
81 | | /// Insert value into cache, triggering eviction if needed |
82 | 11.1k | pub fn insert(&self, key: Vec<u8>, value: Bytes) { |
83 | 11.1k | let size = key.len() + value.len() + std::mem::size_of::<CacheEntry>(); |
84 | | |
85 | | // Don't cache very large values |
86 | 11.1k | let high_watermark = self.high_watermark.load(Ordering::Acquire); |
87 | 11.1k | if size > high_watermark / 4 { |
88 | 1 | return; |
89 | 11.1k | } |
90 | | |
91 | | // Check if we need to evict before inserting |
92 | 11.1k | let current_usage = self.stats.cache_memory.load(Ordering::Acquire); |
93 | 11.1k | let high_watermark = self.high_watermark.load(Ordering::Acquire); |
94 | 11.1k | if current_usage + size > high_watermark { |
95 | 10 | self.evict_entries(); |
96 | 11.1k | } |
97 | | |
98 | 11.1k | let hash = murmur3_32(&key, 0); |
99 | 11.1k | let bucket_idx = (hash as usize) % CACHE_BUCKETS; |
100 | | |
101 | 11.1k | let mut bucket = self.buckets[bucket_idx].write(); |
102 | | |
103 | | // Check if key already exists and update |
104 | 11.1k | for entry352 in bucket.iter_mut() { |
105 | 352 | if entry.key == key { |
106 | 1 | let old_size = entry.size; |
107 | 1 | entry.value = value; |
108 | 1 | entry.size = size; |
109 | 1 | entry.reference_bit.store(true, Ordering::Release); |
110 | | |
111 | | // Update memory usage |
112 | 1 | if size > old_size { |
113 | 1 | self.stats |
114 | 1 | .cache_memory |
115 | 1 | .fetch_add(size - old_size, Ordering::AcqRel); |
116 | 1 | } else { |
117 | 0 | self.stats |
118 | 0 | .cache_memory |
119 | 0 | .fetch_sub(old_size - size, Ordering::AcqRel); |
120 | 0 | } |
121 | 1 | return; |
122 | 351 | } |
123 | | } |
124 | | |
125 | | // Add new entry |
126 | 11.1k | let entry = CacheEntry { |
127 | 11.1k | key, |
128 | 11.1k | value, |
129 | 11.1k | reference_bit: Arc::new(AtomicBool::new(true)), |
130 | 11.1k | size, |
131 | 11.1k | access_count: Arc::new(AtomicU32::new(1)), |
132 | 11.1k | }; |
133 | | |
134 | 11.1k | bucket.push(entry); |
135 | 11.1k | self.stats.cache_memory.fetch_add(size, Ordering::AcqRel); |
136 | 11.1k | } |
137 | | |
138 | | /// Remove specific key from cache |
139 | 107 | pub fn remove(&self, key: &[u8]) { |
140 | 107 | let hash = murmur3_32(key, 0); |
141 | 107 | let bucket_idx = (hash as usize) % CACHE_BUCKETS; |
142 | | |
143 | 107 | let mut bucket = self.buckets[bucket_idx].write(); |
144 | | |
145 | 107 | if let Some(pos1 ) = bucket.iter().position(|e| e.key1 == key1 ) { |
146 | 1 | let entry = bucket.remove(pos); |
147 | 1 | self.stats |
148 | 1 | .cache_memory |
149 | 1 | .fetch_sub(entry.size, Ordering::AcqRel); |
150 | 106 | } |
151 | 107 | } |
152 | | |
153 | | /// CLOCK algorithm eviction - scan entries circularly, evicting those without reference bit |
154 | 10 | pub fn evict_entries(&self) { |
155 | | // Try to acquire eviction lock, return if already evicting |
156 | 10 | let _lock = match self.eviction_lock.try_lock() { |
157 | 10 | Some(lock) => lock, |
158 | 0 | None => return, |
159 | | }; |
160 | | |
161 | 10 | let target_usage = self.low_watermark.load(Ordering::Acquire); |
162 | 10 | let mut current_usage = self.stats.cache_memory.load(Ordering::Acquire); |
163 | | |
164 | 10 | if current_usage <= target_usage { |
165 | 0 | return; |
166 | 10 | } |
167 | | |
168 | 10 | let mut scans = 0; |
169 | | const MAX_SCANS: usize = 3; // Maximum passes through cache |
170 | | |
171 | 30 | while current_usage > target_usage && scans < MAX_SCANS20 { |
172 | 20 | let mut entries_checked = 0; |
173 | 20 | let mut bucket_count = 0; |
174 | 327k | for bucket327k in &self.buckets { |
175 | 327k | bucket_count += bucket.read().len(); |
176 | 327k | } |
177 | 20 | let total_entries = bucket_count; |
178 | | |
179 | | // Scan through buckets using CLOCK hand |
180 | 327k | while entries_checked < total_entries && current_usage > target_usage327k { |
181 | 327k | let hand = self.clock_hand.fetch_add(1, Ordering::AcqRel) % CACHE_BUCKETS; |
182 | | |
183 | 327k | let mut bucket = self.buckets[hand].write(); |
184 | 327k | let mut i = 0; |
185 | | |
186 | 346k | while i < bucket.len() { |
187 | 18.8k | let entry = &bucket[i]; |
188 | | |
189 | | // Check reference bit |
190 | 18.8k | if entry.reference_bit.load(Ordering::Acquire) { |
191 | 9.42k | // Clear reference bit and give second chance with barrier |
192 | 9.42k | entry.reference_bit.store(false, Ordering::Release); |
193 | 9.42k | std::sync::atomic::fence(Ordering::Release); |
194 | 9.42k | i += 1; |
195 | 9.42k | } else { |
196 | 9.42k | // No reference bit - evict this entry |
197 | 9.42k | let removed = bucket.remove(i); |
198 | 9.42k | self.stats |
199 | 9.42k | .cache_memory |
200 | 9.42k | .fetch_sub(removed.size, Ordering::AcqRel); |
201 | 9.42k | self.stats.record_eviction(1); |
202 | 9.42k | current_usage -= removed.size; |
203 | 9.42k | // Don't increment i since we removed an element |
204 | 9.42k | } |
205 | | |
206 | 18.8k | entries_checked += 1; |
207 | | |
208 | 18.8k | if current_usage <= target_usage { |
209 | 10 | break; |
210 | 18.8k | } |
211 | | } |
212 | | } |
213 | | |
214 | 20 | scans += 1; |
215 | | } |
216 | 10 | } |
217 | | |
218 | | /// Clear all cache entries |
219 | 1 | pub fn clear(&self) { |
220 | 16.3k | for bucket16.3k in &self.buckets { |
221 | 16.3k | bucket.write().clear(); |
222 | 16.3k | } |
223 | | |
224 | 1 | self.stats.cache_memory.store(0, Ordering::Release); |
225 | 1 | self.clock_hand.store(0, Ordering::Release); |
226 | 1 | } |
227 | | |
228 | | /// Get current cache statistics |
229 | 4 | pub fn stats(&self) -> CacheStats { |
230 | 4 | CacheStats { |
231 | 4 | entries: 0, // Calculate from buckets if needed |
232 | 4 | memory_usage: self.stats.cache_memory.load(Ordering::Acquire), |
233 | 4 | high_watermark: self.high_watermark.load(Ordering::Acquire), |
234 | 4 | low_watermark: self.low_watermark.load(Ordering::Acquire), |
235 | 4 | } |
236 | 4 | } |
237 | | |
238 | | /// Adjust cache watermarks dynamically |
239 | 3 | pub fn adjust_watermarks(&self, high_mb: usize, low_mb: usize) { |
240 | 3 | let high = high_mb * MB; |
241 | 3 | let low = low_mb * MB; |
242 | | |
243 | 3 | if high > low && high <= CACHE_MAX_SIZE2 { |
244 | | // Max 1GB for cache |
245 | | // Update watermarks atomically |
246 | 2 | self.high_watermark.store(high, Ordering::Release); |
247 | 2 | self.low_watermark.store(low, Ordering::Release); |
248 | | |
249 | | // Trigger eviction if we're over the new high watermark |
250 | 2 | let current_usage = self.stats.cache_memory.load(Ordering::Acquire); |
251 | 2 | if current_usage > high { |
252 | 0 | self.evict_entries(); |
253 | 2 | } |
254 | 1 | } |
255 | 3 | } |
256 | | } |
257 | | |
258 | | #[derive(Debug, Clone)] |
259 | | pub struct CacheStats { |
260 | | pub entries: u32, |
261 | | pub memory_usage: usize, |
262 | | pub high_watermark: usize, |
263 | | pub low_watermark: usize, |
264 | | } |