1use crate::constants::*;
2use crate::stats::Statistics;
3use crate::utils::hash::murmur3_32;
4use bytes::Bytes;
5use parking_lot::{Mutex, RwLock};
6use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
7use std::sync::Arc;
8
9pub struct ClockCache {
12 buckets: Vec<RwLock<Vec<CacheEntry>>>,
14
15 clock_hand: AtomicUsize,
17
18 high_watermark: AtomicUsize,
20
21 low_watermark: AtomicUsize,
23
24 eviction_lock: Mutex<()>,
26
27 stats: Arc<Statistics>,
29}
30
31#[derive(Clone)]
32struct CacheEntry {
33 key: Vec<u8>,
34 value: Bytes,
35
36 reference_bit: Arc<AtomicBool>,
38
39 size: usize,
41
42 access_count: Arc<AtomicU32>,
44}
45
46impl ClockCache {
47 pub fn new(stats: Arc<Statistics>) -> Self {
48 let buckets = (0..CACHE_BUCKETS)
49 .map(|_| RwLock::new(Vec::new()))
50 .collect();
51
52 Self {
53 buckets,
54 clock_hand: AtomicUsize::new(0),
55 high_watermark: AtomicUsize::new(CACHE_HIGH_WATERMARK_MB * MB),
56 low_watermark: AtomicUsize::new(CACHE_LOW_WATERMARK_MB * MB),
57 eviction_lock: Mutex::new(()),
58 stats,
59 }
60 }
61
62 pub fn get(&self, key: &[u8]) -> Option<Bytes> {
64 let hash = murmur3_32(key, 0);
65 let bucket_idx = (hash as usize) % CACHE_BUCKETS;
66
67 let bucket = self.buckets[bucket_idx].read();
68
69 for entry in bucket.iter() {
70 if entry.key == key {
71 entry.reference_bit.store(true, Ordering::Release);
73 entry.access_count.fetch_add(1, Ordering::Relaxed);
74 return Some(entry.value.clone());
75 }
76 }
77
78 None
79 }
80
81 pub fn insert(&self, key: Vec<u8>, value: Bytes) {
83 let size = key.len() + value.len() + std::mem::size_of::<CacheEntry>();
84
85 let high_watermark = self.high_watermark.load(Ordering::Acquire);
87 if size > high_watermark / 4 {
88 return;
89 }
90
91 let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
93 let high_watermark = self.high_watermark.load(Ordering::Acquire);
94 if current_usage + size > high_watermark {
95 self.evict_entries();
96 }
97
98 let hash = murmur3_32(&key, 0);
99 let bucket_idx = (hash as usize) % CACHE_BUCKETS;
100
101 let mut bucket = self.buckets[bucket_idx].write();
102
103 for entry in bucket.iter_mut() {
105 if entry.key == key {
106 let old_size = entry.size;
107 entry.value = value;
108 entry.size = size;
109 entry.reference_bit.store(true, Ordering::Release);
110
111 if size > old_size {
113 self.stats
114 .cache_memory
115 .fetch_add(size - old_size, Ordering::AcqRel);
116 } else {
117 self.stats
118 .cache_memory
119 .fetch_sub(old_size - size, Ordering::AcqRel);
120 }
121 return;
122 }
123 }
124
125 let entry = CacheEntry {
127 key,
128 value,
129 reference_bit: Arc::new(AtomicBool::new(true)),
130 size,
131 access_count: Arc::new(AtomicU32::new(1)),
132 };
133
134 bucket.push(entry);
135 self.stats.cache_memory.fetch_add(size, Ordering::AcqRel);
136 }
137
138 pub fn remove(&self, key: &[u8]) {
140 let hash = murmur3_32(key, 0);
141 let bucket_idx = (hash as usize) % CACHE_BUCKETS;
142
143 let mut bucket = self.buckets[bucket_idx].write();
144
145 if let Some(pos) = bucket.iter().position(|e| e.key == key) {
146 let entry = bucket.remove(pos);
147 self.stats
148 .cache_memory
149 .fetch_sub(entry.size, Ordering::AcqRel);
150 }
151 }
152
153 pub fn evict_entries(&self) {
155 let _lock = match self.eviction_lock.try_lock() {
157 Some(lock) => lock,
158 None => return,
159 };
160
161 let target_usage = self.low_watermark.load(Ordering::Acquire);
162 let mut current_usage = self.stats.cache_memory.load(Ordering::Acquire);
163
164 if current_usage <= target_usage {
165 return;
166 }
167
168 let mut evicted_bytes = 0;
169 let mut scans = 0;
170 const MAX_SCANS: usize = 3; while current_usage > target_usage && scans < MAX_SCANS {
173 let mut entries_checked = 0;
174 let mut bucket_count = 0;
175 for bucket in &self.buckets {
176 bucket_count += bucket.read().len();
177 }
178 let total_entries = bucket_count;
179
180 while entries_checked < total_entries && current_usage > target_usage {
182 let hand = self.clock_hand.fetch_add(1, Ordering::AcqRel) % CACHE_BUCKETS;
183
184 let mut bucket = self.buckets[hand].write();
185 let mut i = 0;
186
187 while i < bucket.len() {
188 let entry = &bucket[i];
189
190 if entry.reference_bit.load(Ordering::Acquire) {
192 entry.reference_bit.store(false, Ordering::Release);
194 std::sync::atomic::fence(Ordering::Release);
195 i += 1;
196 } else {
197 let removed = bucket.remove(i);
199 evicted_bytes += removed.size;
200 self.stats
201 .cache_memory
202 .fetch_sub(removed.size, Ordering::AcqRel);
203 self.stats.record_eviction(1);
204 current_usage -= removed.size;
205 }
207
208 entries_checked += 1;
209
210 if current_usage <= target_usage {
211 break;
212 }
213 }
214 }
215
216 scans += 1;
217 }
218
219 if evicted_bytes > 0 {
220 #[cfg(debug_assertions)]
222 eprintln!(
223 "Cache eviction: freed {} bytes in {} scans, usage now: {} bytes",
224 evicted_bytes, scans, current_usage
225 );
226 }
227 }
228
229 pub fn clear(&self) {
231 for bucket in &self.buckets {
232 bucket.write().clear();
233 }
234
235 self.stats.cache_memory.store(0, Ordering::Release);
236 self.clock_hand.store(0, Ordering::Release);
237 }
238
239 pub fn stats(&self) -> CacheStats {
241 CacheStats {
242 entries: 0, memory_usage: self.stats.cache_memory.load(Ordering::Acquire),
244 high_watermark: self.high_watermark.load(Ordering::Acquire),
245 low_watermark: self.low_watermark.load(Ordering::Acquire),
246 }
247 }
248
249 pub fn adjust_watermarks(&self, high_mb: usize, low_mb: usize) {
251 let high = high_mb * MB;
252 let low = low_mb * MB;
253
254 if high > low && high <= CACHE_MAX_SIZE {
255 self.high_watermark.store(high, Ordering::Release);
258 self.low_watermark.store(low, Ordering::Release);
259
260 let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
262 if current_usage > high {
263 self.evict_entries();
264 }
265 }
266 }
267}
268
269#[derive(Debug, Clone)]
270pub struct CacheStats {
271 pub entries: u32,
272 pub memory_usage: usize,
273 pub high_watermark: usize,
274 pub low_watermark: usize,
275}