Coverage Report

Created: 2026-01-04 13:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/feoxdb/feoxdb/src/core/cache.rs
Line
Count
Source
1
use crate::constants::*;
2
use crate::stats::Statistics;
3
use crate::utils::hash::murmur3_32;
4
use bytes::Bytes;
5
use parking_lot::{Mutex, RwLock};
6
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
7
use std::sync::Arc;
8
9
/// CLOCK algorithm cache implementation
10
/// Uses reference bits and circular scanning for eviction
11
pub struct ClockCache {
12
    /// Cache entries organized in buckets for better locality
13
    buckets: Vec<RwLock<Vec<CacheEntry>>>,
14
15
    /// Global CLOCK hand position for eviction scanning
16
    clock_hand: AtomicUsize,
17
18
    /// High watermark for triggering eviction (bytes)
19
    high_watermark: AtomicUsize,
20
21
    /// Low watermark to evict down to (bytes)
22
    low_watermark: AtomicUsize,
23
24
    /// Lock for eviction process
25
    eviction_lock: Mutex<()>,
26
27
    /// Shared statistics
28
    stats: Arc<Statistics>,
29
}
30
31
#[derive(Clone)]
32
struct CacheEntry {
33
    key: Vec<u8>,
34
    value: Bytes,
35
36
    /// Reference bit for CLOCK algorithm (accessed recently)
37
    reference_bit: Arc<AtomicBool>,
38
39
    /// Size of this entry in bytes
40
    size: usize,
41
42
    /// Access count for statistics
43
    access_count: Arc<AtomicU32>,
44
}
45
46
impl ClockCache {
47
26
    pub fn new(stats: Arc<Statistics>) -> Self {
48
26
        let buckets = (0..CACHE_BUCKETS)
49
425k
            .
map26
(|_| RwLock::new(Vec::new()))
50
26
            .collect();
51
52
26
        Self {
53
26
            buckets,
54
26
            clock_hand: AtomicUsize::new(0),
55
26
            high_watermark: AtomicUsize::new(CACHE_HIGH_WATERMARK_MB * MB),
56
26
            low_watermark: AtomicUsize::new(CACHE_LOW_WATERMARK_MB * MB),
57
26
            eviction_lock: Mutex::new(()),
58
26
            stats,
59
26
        }
60
26
    }
61
62
    /// Get value from cache, setting reference bit on access
63
1.06k
    pub fn get(&self, key: &[u8]) -> Option<Bytes> {
64
1.06k
        let hash = murmur3_32(key, 0);
65
1.06k
        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
66
67
1.06k
        let bucket = self.buckets[bucket_idx].read();
68
69
1.06k
        for 
entry889
in bucket.iter() {
70
889
            if entry.key == key {
71
                // Set reference bit on access (CLOCK algorithm)
72
853
                entry.reference_bit.store(true, Ordering::Release);
73
853
                entry.access_count.fetch_add(1, Ordering::Relaxed);
74
853
                return Some(entry.value.clone());
75
36
            }
76
        }
77
78
216
        None
79
1.06k
    }
80
81
    /// Insert value into cache, triggering eviction if needed
82
11.1k
    pub fn insert(&self, key: Vec<u8>, value: Bytes) {
83
11.1k
        let size = key.len() + value.len() + std::mem::size_of::<CacheEntry>();
84
85
        // Don't cache very large values
86
11.1k
        let high_watermark = self.high_watermark.load(Ordering::Acquire);
87
11.1k
        if size > high_watermark / 4 {
88
1
            return;
89
11.1k
        }
90
91
        // Check if we need to evict before inserting
92
11.1k
        let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
93
11.1k
        let high_watermark = self.high_watermark.load(Ordering::Acquire);
94
11.1k
        if current_usage + size > high_watermark {
95
10
            self.evict_entries();
96
11.1k
        }
97
98
11.1k
        let hash = murmur3_32(&key, 0);
99
11.1k
        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
100
101
11.1k
        let mut bucket = self.buckets[bucket_idx].write();
102
103
        // Check if key already exists and update
104
11.1k
        for 
entry352
in bucket.iter_mut() {
105
352
            if entry.key == key {
106
1
                let old_size = entry.size;
107
1
                entry.value = value;
108
1
                entry.size = size;
109
1
                entry.reference_bit.store(true, Ordering::Release);
110
111
                // Update memory usage
112
1
                if size > old_size {
113
1
                    self.stats
114
1
                        .cache_memory
115
1
                        .fetch_add(size - old_size, Ordering::AcqRel);
116
1
                } else {
117
0
                    self.stats
118
0
                        .cache_memory
119
0
                        .fetch_sub(old_size - size, Ordering::AcqRel);
120
0
                }
121
1
                return;
122
351
            }
123
        }
124
125
        // Add new entry
126
11.1k
        let entry = CacheEntry {
127
11.1k
            key,
128
11.1k
            value,
129
11.1k
            reference_bit: Arc::new(AtomicBool::new(true)),
130
11.1k
            size,
131
11.1k
            access_count: Arc::new(AtomicU32::new(1)),
132
11.1k
        };
133
134
11.1k
        bucket.push(entry);
135
11.1k
        self.stats.cache_memory.fetch_add(size, Ordering::AcqRel);
136
11.1k
    }
137
138
    /// Remove specific key from cache
139
107
    pub fn remove(&self, key: &[u8]) {
140
107
        let hash = murmur3_32(key, 0);
141
107
        let bucket_idx = (hash as usize) % CACHE_BUCKETS;
142
143
107
        let mut bucket = self.buckets[bucket_idx].write();
144
145
107
        if let Some(
pos1
) = bucket.iter().position(|e|
e.key1
==
key1
) {
146
1
            let entry = bucket.remove(pos);
147
1
            self.stats
148
1
                .cache_memory
149
1
                .fetch_sub(entry.size, Ordering::AcqRel);
150
106
        }
151
107
    }
152
153
    /// CLOCK algorithm eviction - scan entries circularly, evicting those without reference bit
154
10
    pub fn evict_entries(&self) {
155
        // Try to acquire eviction lock, return if already evicting
156
10
        let _lock = match self.eviction_lock.try_lock() {
157
10
            Some(lock) => lock,
158
0
            None => return,
159
        };
160
161
10
        let target_usage = self.low_watermark.load(Ordering::Acquire);
162
10
        let mut current_usage = self.stats.cache_memory.load(Ordering::Acquire);
163
164
10
        if current_usage <= target_usage {
165
0
            return;
166
10
        }
167
168
10
        let mut scans = 0;
169
        const MAX_SCANS: usize = 3; // Maximum passes through cache
170
171
30
        while current_usage > target_usage && 
scans < MAX_SCANS20
{
172
20
            let mut entries_checked = 0;
173
20
            let mut bucket_count = 0;
174
327k
            for 
bucket327k
in &self.buckets {
175
327k
                bucket_count += bucket.read().len();
176
327k
            }
177
20
            let total_entries = bucket_count;
178
179
            // Scan through buckets using CLOCK hand
180
327k
            while entries_checked < total_entries && 
current_usage > target_usage327k
{
181
327k
                let hand = self.clock_hand.fetch_add(1, Ordering::AcqRel) % CACHE_BUCKETS;
182
183
327k
                let mut bucket = self.buckets[hand].write();
184
327k
                let mut i = 0;
185
186
346k
                while i < bucket.len() {
187
18.8k
                    let entry = &bucket[i];
188
189
                    // Check reference bit
190
18.8k
                    if entry.reference_bit.load(Ordering::Acquire) {
191
9.42k
                        // Clear reference bit and give second chance with barrier
192
9.42k
                        entry.reference_bit.store(false, Ordering::Release);
193
9.42k
                        std::sync::atomic::fence(Ordering::Release);
194
9.42k
                        i += 1;
195
9.42k
                    } else {
196
9.42k
                        // No reference bit - evict this entry
197
9.42k
                        let removed = bucket.remove(i);
198
9.42k
                        self.stats
199
9.42k
                            .cache_memory
200
9.42k
                            .fetch_sub(removed.size, Ordering::AcqRel);
201
9.42k
                        self.stats.record_eviction(1);
202
9.42k
                        current_usage -= removed.size;
203
9.42k
                        // Don't increment i since we removed an element
204
9.42k
                    }
205
206
18.8k
                    entries_checked += 1;
207
208
18.8k
                    if current_usage <= target_usage {
209
10
                        break;
210
18.8k
                    }
211
                }
212
            }
213
214
20
            scans += 1;
215
        }
216
10
    }
217
218
    /// Clear all cache entries
219
1
    pub fn clear(&self) {
220
16.3k
        for 
bucket16.3k
in &self.buckets {
221
16.3k
            bucket.write().clear();
222
16.3k
        }
223
224
1
        self.stats.cache_memory.store(0, Ordering::Release);
225
1
        self.clock_hand.store(0, Ordering::Release);
226
1
    }
227
228
    /// Get current cache statistics
229
4
    pub fn stats(&self) -> CacheStats {
230
4
        CacheStats {
231
4
            entries: 0, // Calculate from buckets if needed
232
4
            memory_usage: self.stats.cache_memory.load(Ordering::Acquire),
233
4
            high_watermark: self.high_watermark.load(Ordering::Acquire),
234
4
            low_watermark: self.low_watermark.load(Ordering::Acquire),
235
4
        }
236
4
    }
237
238
    /// Adjust cache watermarks dynamically
239
3
    pub fn adjust_watermarks(&self, high_mb: usize, low_mb: usize) {
240
3
        let high = high_mb * MB;
241
3
        let low = low_mb * MB;
242
243
3
        if high > low && 
high <= CACHE_MAX_SIZE2
{
244
            // Max 1GB for cache
245
            // Update watermarks atomically
246
2
            self.high_watermark.store(high, Ordering::Release);
247
2
            self.low_watermark.store(low, Ordering::Release);
248
249
            // Trigger eviction if we're over the new high watermark
250
2
            let current_usage = self.stats.cache_memory.load(Ordering::Acquire);
251
2
            if current_usage > high {
252
0
                self.evict_entries();
253
2
            }
254
1
        }
255
3
    }
256
}
257
258
#[derive(Debug, Clone)]
259
pub struct CacheStats {
260
    pub entries: u32,
261
    pub memory_usage: usize,
262
    pub high_watermark: usize,
263
    pub low_watermark: usize,
264
}