Coverage Report

Created: 2025-09-19 09:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/core/ttl_sweep.rs
Line
Count
Source
1
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2
use std::sync::{Arc, Weak};
3
use std::thread::{self, JoinHandle};
4
use std::time::{Duration, Instant};
5
6
use rand::rngs::ThreadRng;
7
use rand::Rng;
8
9
use crate::constants::Operation;
10
use crate::core::store::FeoxStore;
11
12
/// Configuration for TTL cleaner background thread
13
#[derive(Clone, Debug)]
14
pub struct TtlConfig {
15
    /// Number of keys to sample per batch
16
    pub sample_size: usize,
17
    /// Continue sampling if expiry rate exceeds this threshold (0.0-1.0)
18
    pub expiry_threshold: f32,
19
    /// Maximum iterations per cleaning run
20
    pub max_iterations: usize,
21
    /// Maximum time to spend per cleaning run
22
    pub max_time_per_run: Duration,
23
    /// Sleep interval between cleaning runs
24
    pub sleep_interval: Duration,
25
    /// Whether TTL cleaner is enabled
26
    pub enabled: bool,
27
}
28
29
impl Default for TtlConfig {
30
10
    fn default() -> Self {
31
10
        Self {
32
10
            sample_size: 100,
33
10
            expiry_threshold: 0.25,
34
10
            max_iterations: 16,
35
10
            max_time_per_run: Duration::from_millis(1),
36
10
            sleep_interval: Duration::from_millis(1000),
37
10
            enabled: false,
38
10
        }
39
10
    }
40
}
41
42
impl TtlConfig {
43
    /// Create a default configuration for persistent stores
44
0
    pub fn default_persistent() -> Self {
45
0
        Self {
46
0
            enabled: true,
47
0
            ..Default::default()
48
0
        }
49
0
    }
50
51
    /// Create a default configuration for memory-only stores
52
0
    pub fn default_memory() -> Self {
53
0
        Self {
54
0
            enabled: true,
55
0
            ..Default::default()
56
0
        }
57
0
    }
58
}
59
60
/// Background thread that periodically sweeps expired TTL keys
61
pub struct TtlSweeper {
62
    /// Weak reference to the store to avoid circular references
63
    store: Weak<FeoxStore>,
64
    /// Configuration
65
    config: TtlConfig,
66
    /// Shutdown flag
67
    shutdown: Arc<AtomicBool>,
68
    /// Thread handle
69
    handle: Option<JoinHandle<()>>,
70
    /// Statistics
71
    stats: TtlSweeperStats,
72
}
73
74
/// Statistics for TTL sweeper operations
75
pub struct TtlSweeperStats {
76
    /// Total keys sampled
77
    pub total_sampled: Arc<AtomicU64>,
78
    /// Total keys expired
79
    pub total_expired: Arc<AtomicU64>,
80
    /// Total cleaning runs
81
    pub total_runs: Arc<AtomicU64>,
82
    /// Last run timestamp (nanoseconds)
83
    pub last_run: Arc<AtomicU64>,
84
}
85
86
impl TtlSweeperStats {
87
0
    fn new() -> Self {
88
0
        Self {
89
0
            total_sampled: Arc::new(AtomicU64::new(0)),
90
0
            total_expired: Arc::new(AtomicU64::new(0)),
91
0
            total_runs: Arc::new(AtomicU64::new(0)),
92
0
            last_run: Arc::new(AtomicU64::new(0)),
93
0
        }
94
0
    }
95
}
96
97
impl TtlSweeper {
98
    /// Create a new TTL sweeper
99
0
    pub fn new(store: Weak<FeoxStore>, config: TtlConfig) -> Self {
100
0
        Self {
101
0
            store,
102
0
            config,
103
0
            shutdown: Arc::new(AtomicBool::new(false)),
104
0
            handle: None,
105
0
            stats: TtlSweeperStats::new(),
106
0
        }
107
0
    }
108
109
    /// Start the background sweeper thread
110
0
    pub fn start(&mut self) {
111
0
        if !self.config.enabled {
112
0
            return;
113
0
        }
114
115
0
        let store = self.store.clone();
116
0
        let config = self.config.clone();
117
0
        let shutdown = self.shutdown.clone();
118
0
        let stats = TtlSweeperStats {
119
0
            total_sampled: self.stats.total_sampled.clone(),
120
0
            total_expired: self.stats.total_expired.clone(),
121
0
            total_runs: self.stats.total_runs.clone(),
122
0
            last_run: self.stats.last_run.clone(),
123
0
        };
124
125
0
        let handle = thread::spawn(move || {
126
0
            run_sweeper_loop(store, config, shutdown, stats);
127
0
        });
128
129
0
        self.handle = Some(handle);
130
0
    }
131
132
    /// Stop the background sweeper thread
133
0
    pub fn stop(&mut self) {
134
0
        self.shutdown.store(true, Ordering::Release);
135
136
0
        if let Some(handle) = self.handle.take() {
137
0
            let _ = handle.join();
138
0
        }
139
0
    }
140
141
    /// Get sweeper statistics
142
0
    pub fn stats(&self) -> SweeperSnapshot {
143
0
        SweeperSnapshot {
144
0
            total_sampled: self.stats.total_sampled.load(Ordering::Relaxed),
145
0
            total_expired: self.stats.total_expired.load(Ordering::Relaxed),
146
0
            total_runs: self.stats.total_runs.load(Ordering::Relaxed),
147
0
            last_run: self.stats.last_run.load(Ordering::Relaxed),
148
0
        }
149
0
    }
150
}
151
152
impl Drop for TtlSweeper {
153
0
    fn drop(&mut self) {
154
0
        self.stop();
155
0
    }
156
}
157
158
/// Snapshot of sweeper statistics
159
#[derive(Debug, Clone)]
160
pub struct SweeperSnapshot {
161
    pub total_sampled: u64,
162
    pub total_expired: u64,
163
    pub total_runs: u64,
164
    pub last_run: u64,
165
}
166
167
/// Main sweeper loop that runs in the background thread
168
0
fn run_sweeper_loop(
169
0
    store: Weak<FeoxStore>,
170
0
    config: TtlConfig,
171
0
    shutdown: Arc<AtomicBool>,
172
0
    stats: TtlSweeperStats,
173
0
) {
174
0
    while !shutdown.load(Ordering::Acquire) {
175
        // Sleep between runs
176
0
        thread::sleep(config.sleep_interval);
177
178
        // Try to get strong reference to store
179
0
        let Some(store) = store.upgrade() else {
180
            // Store has been dropped, exit
181
0
            break;
182
        };
183
184
        // Perform sweeping run
185
0
        let start = Instant::now();
186
0
        let mut iterations = 0;
187
0
        let mut total_sampled = 0;
188
0
        let mut total_expired = 0;
189
190
        loop {
191
            // Sample and expire a batch
192
0
            let (sampled, expired) = sample_and_expire_batch(&store, &config);
193
0
            total_sampled += sampled;
194
0
            total_expired += expired;
195
0
            iterations += 1;
196
197
            // Calculate expiry rate
198
0
            let expiry_rate = if sampled > 0 {
199
0
                expired as f32 / sampled as f32
200
            } else {
201
0
                0.0
202
            };
203
204
            // Check stop conditions
205
0
            if expiry_rate < config.expiry_threshold {
206
0
                break; // Few expired keys, we're done
207
0
            }
208
0
            if iterations >= config.max_iterations {
209
0
                break; // Bounded iterations
210
0
            }
211
0
            if start.elapsed() > config.max_time_per_run {
212
0
                break; // Bounded time
213
0
            }
214
        }
215
216
        // Update statistics
217
0
        if total_sampled > 0 {
218
0
            stats
219
0
                .total_sampled
220
0
                .fetch_add(total_sampled, Ordering::Relaxed);
221
0
            stats
222
0
                .total_expired
223
0
                .fetch_add(total_expired, Ordering::Relaxed);
224
0
            stats.total_runs.fetch_add(1, Ordering::Relaxed);
225
0
            stats.last_run.store(
226
0
                std::time::SystemTime::now()
227
0
                    .duration_since(std::time::UNIX_EPOCH)
228
0
                    .unwrap()
229
0
                    .as_nanos() as u64,
230
0
                Ordering::Relaxed,
231
0
            );
232
0
        }
233
234
        // Check shutdown flag again
235
0
        if shutdown.load(Ordering::Acquire) {
236
0
            break;
237
0
        }
238
    }
239
0
}
240
241
/// Sample keys and expire those that have exceeded their TTL
242
0
fn sample_and_expire_batch(store: &Arc<FeoxStore>, config: &TtlConfig) -> (u64, u64) {
243
0
    let now = store.get_timestamp_pub();
244
0
    let mut sampled = 0;
245
0
    let mut expired = 0;
246
0
    let mut rng = rand::rng();
247
248
    // Get access to the hash table
249
0
    let hash_table = store.get_hash_table();
250
251
    // Sample random entries directly from the hash table
252
0
    for _ in 0..config.sample_size {
253
        // Try to get a random entry with TTL
254
0
        if let Some((key, record)) = get_random_ttl_entry(hash_table, &mut rng) {
255
0
            sampled += 1;
256
257
0
            let ttl_expiry = record.ttl_expiry.load(Ordering::Relaxed);
258
259
            // Check if expired
260
0
            if ttl_expiry > 0 && ttl_expiry < now {
261
                // Remove expired entry
262
0
                hash_table.remove(&key);
263
0
                store.remove_from_tree(&key);
264
265
0
                expired += 1;
266
267
                // Queue disk cleanup if needed
268
0
                if record.sector.load(Ordering::Relaxed) > 0 {
269
                    // Add to write buffer for disk cleanup
270
0
                    if let Some(wb) = store.get_write_buffer() {
271
0
                        let _ = wb.add_write(Operation::Delete, record, 0);
272
0
                    }
273
0
                }
274
0
            }
275
0
        }
276
    }
277
278
0
    (sampled, expired)
279
0
}
280
281
/// Get a random entry with TTL using sampling
282
0
fn get_random_ttl_entry(
283
0
    hash_table: &scc::HashMap<Vec<u8>, Arc<crate::core::record::Record>, ahash::RandomState>,
284
0
    rng: &mut ThreadRng,
285
0
) -> Option<(Vec<u8>, Arc<crate::core::record::Record>)> {
286
    // Sample up to 100 entries and pick one with TTL
287
0
    let mut candidates = Vec::new();
288
0
    let mut count = 0;
289
290
0
    hash_table.scan(|key: &Vec<u8>, value: &Arc<crate::core::record::Record>| {
291
0
        if count >= 100 {
292
0
            return; // Stop iteration
293
0
        }
294
0
        count += 1;
295
296
0
        if value.ttl_expiry.load(Ordering::Relaxed) > 0 {
297
0
            candidates.push((key.clone(), value.clone()));
298
0
        }
299
0
    });
300
301
0
    if candidates.is_empty() {
302
0
        None
303
    } else {
304
        // Pick a random candidate
305
0
        let idx = rng.random_range(0..candidates.len());
306
0
        Some(candidates.into_iter().nth(idx).unwrap())
307
    }
308
0
}