1use crate::constants::*;
2use crate::stats::Statistics;
3use crate::utils::hash::murmur3_32;
4use bytes::Bytes;
5use parking_lot::{Mutex, RwLock};
6use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
7use std::sync::Arc;
8
9pub struct ClockCache {
12 buckets: Vec<RwLock<Vec<CacheEntry>>>,
14
15 clock_hand: AtomicUsize,
17
18 high_watermark: AtomicUsize,
20
21 low_watermark: AtomicUsize,
23
24 eviction_lock: Mutex<()>,
26
27 stats: Arc<Statistics>,
29}
30
31#[derive(Clone)]
32struct CacheEntry {
33 key: Vec<u8>,
34 value: Bytes,
35
36 reference_bit: Arc<AtomicBool>,
38
39 size: usize,
41
42 access_count: Arc<AtomicU32>,
44}
45
46impl ClockCache {
47 pub fn new(stats: Arc<Statistics>) -> Self {
48 let buckets = (0..CACHE_BUCKETS)
49 .map(|_| RwLock::new(Vec::new()))
50 .collect();
51
52 Self {
53 buckets,
54 clock_hand: AtomicUsize::new(0),
55 high_watermark: AtomicUsize::new(CACHE_HIGH_WATERMARK_MB * MB),
56 low_watermark: AtomicUsize::new(CACHE_LOW_WATERMARK_MB * MB),
57 eviction_lock: Mutex::new(()),
58 stats,
59 }
60 }
61
62 pub fn get(&self, key: &[u8]) -> Option<Bytes> {
64 let hash = murmur3_32(key, 0);
65 let bucket_idx = (hash as usize) % CACHE_BUCKETS;
66
67 let bucket = self.buckets[bucket_idx].read();
68
69 for entry in bucket.iter() {
70 if entry.key == key {
71 entry.reference_bit.store(true, Ordering::Release);
73 entry.access_count.fetch_add(1, Ordering::Relaxed);
74 return Some(entry.value.clone());
75 }
76 }
77
78 None
79 }
80
81 pub fn insert(&self, key: Vec<u8>, value: Bytes) {
83 let size = key.len() + value.len() + std::mem::size_of::<CacheEntry>();
84
85 let high_watermark = self.high_watermark.load(Ordering::Acquire);
87 if size > high_watermark / 4 {
88 return;
89 }
90
91 let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
93 let high_watermark = self.high_watermark.load(Ordering::Acquire);
94 if current_usage + size > high_watermark {
95 self.evict_entries();
96 }
97
98 let hash = murmur3_32(&key, 0);
99 let bucket_idx = (hash as usize) % CACHE_BUCKETS;
100
101 let mut bucket = self.buckets[bucket_idx].write();
102
103 for entry in bucket.iter_mut() {
105 if entry.key == key {
106 let old_size = entry.size;
107 entry.value = value;
108 entry.size = size;
109 entry.reference_bit.store(true, Ordering::Release);
110
111 if size > old_size {
113 self.stats
114 .cache_memory
115 .fetch_add(size - old_size, Ordering::AcqRel);
116 } else {
117 self.stats
118 .cache_memory
119 .fetch_sub(old_size - size, Ordering::AcqRel);
120 }
121 return;
122 }
123 }
124
125 let entry = CacheEntry {
127 key,
128 value,
129 reference_bit: Arc::new(AtomicBool::new(true)),
130 size,
131 access_count: Arc::new(AtomicU32::new(1)),
132 };
133
134 bucket.push(entry);
135 self.stats.cache_memory.fetch_add(size, Ordering::AcqRel);
136 }
137
138 pub fn remove(&self, key: &[u8]) {
140 let hash = murmur3_32(key, 0);
141 let bucket_idx = (hash as usize) % CACHE_BUCKETS;
142
143 let mut bucket = self.buckets[bucket_idx].write();
144
145 if let Some(pos) = bucket.iter().position(|e| e.key == key) {
146 let entry = bucket.remove(pos);
147 self.stats
148 .cache_memory
149 .fetch_sub(entry.size, Ordering::AcqRel);
150 }
151 }
152
153 pub fn evict_entries(&self) {
155 let _lock = match self.eviction_lock.try_lock() {
157 Some(lock) => lock,
158 None => return,
159 };
160
161 let target_usage = self.low_watermark.load(Ordering::Acquire);
162 let mut current_usage = self.stats.cache_memory.load(Ordering::Acquire);
163
164 if current_usage <= target_usage {
165 return;
166 }
167
168 let mut scans = 0;
169 const MAX_SCANS: usize = 3; while current_usage > target_usage && scans < MAX_SCANS {
172 let mut entries_checked = 0;
173 let mut bucket_count = 0;
174 for bucket in &self.buckets {
175 bucket_count += bucket.read().len();
176 }
177 let total_entries = bucket_count;
178
179 while entries_checked < total_entries && current_usage > target_usage {
181 let hand = self.clock_hand.fetch_add(1, Ordering::AcqRel) % CACHE_BUCKETS;
182
183 let mut bucket = self.buckets[hand].write();
184 let mut i = 0;
185
186 while i < bucket.len() {
187 let entry = &bucket[i];
188
189 if entry.reference_bit.load(Ordering::Acquire) {
191 entry.reference_bit.store(false, Ordering::Release);
193 std::sync::atomic::fence(Ordering::Release);
194 i += 1;
195 } else {
196 let removed = bucket.remove(i);
198 self.stats
199 .cache_memory
200 .fetch_sub(removed.size, Ordering::AcqRel);
201 self.stats.record_eviction(1);
202 current_usage -= removed.size;
203 }
205
206 entries_checked += 1;
207
208 if current_usage <= target_usage {
209 break;
210 }
211 }
212 }
213
214 scans += 1;
215 }
216 }
217
218 pub fn clear(&self) {
220 for bucket in &self.buckets {
221 bucket.write().clear();
222 }
223
224 self.stats.cache_memory.store(0, Ordering::Release);
225 self.clock_hand.store(0, Ordering::Release);
226 }
227
228 pub fn stats(&self) -> CacheStats {
230 CacheStats {
231 entries: 0, memory_usage: self.stats.cache_memory.load(Ordering::Acquire),
233 high_watermark: self.high_watermark.load(Ordering::Acquire),
234 low_watermark: self.low_watermark.load(Ordering::Acquire),
235 }
236 }
237
238 pub fn adjust_watermarks(&self, high_mb: usize, low_mb: usize) {
240 let high = high_mb * MB;
241 let low = low_mb * MB;
242
243 if high > low && high <= CACHE_MAX_SIZE {
244 self.high_watermark.store(high, Ordering::Release);
247 self.low_watermark.store(low, Ordering::Release);
248
249 let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
251 if current_usage > high {
252 self.evict_entries();
253 }
254 }
255 }
256}
257
258#[derive(Debug, Clone)]
259pub struct CacheStats {
260 pub entries: u32,
261 pub memory_usage: usize,
262 pub high_watermark: usize,
263 pub low_watermark: usize,
264}