feoxdb/core/
ttl_sweep.rs

1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::{Arc, Weak};
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant};
5
6use rand::rngs::ThreadRng;
7use rand::Rng;
8
9use crate::constants::Operation;
10use crate::core::store::FeoxStore;
11
12/// Configuration for TTL cleaner background thread
13#[derive(Clone, Debug)]
14pub struct TtlConfig {
15    /// Number of keys to sample per batch
16    pub sample_size: usize,
17    /// Continue sampling if expiry rate exceeds this threshold (0.0-1.0)
18    pub expiry_threshold: f32,
19    /// Maximum iterations per cleaning run
20    pub max_iterations: usize,
21    /// Maximum time to spend per cleaning run
22    pub max_time_per_run: Duration,
23    /// Sleep interval between cleaning runs
24    pub sleep_interval: Duration,
25    /// Whether TTL cleaner is enabled
26    pub enabled: bool,
27}
28
29impl Default for TtlConfig {
30    fn default() -> Self {
31        Self {
32            sample_size: 100,
33            expiry_threshold: 0.25,
34            max_iterations: 16,
35            max_time_per_run: Duration::from_millis(1),
36            sleep_interval: Duration::from_millis(1000),
37            enabled: false,
38        }
39    }
40}
41
42impl TtlConfig {
43    /// Create a default configuration for persistent stores
44    pub fn default_persistent() -> Self {
45        Self {
46            enabled: true,
47            ..Default::default()
48        }
49    }
50
51    /// Create a default configuration for memory-only stores
52    pub fn default_memory() -> Self {
53        Self {
54            enabled: true,
55            ..Default::default()
56        }
57    }
58}
59
60/// Background thread that periodically sweeps expired TTL keys
61pub struct TtlSweeper {
62    /// Weak reference to the store to avoid circular references
63    store: Weak<FeoxStore>,
64    /// Configuration
65    config: TtlConfig,
66    /// Shutdown flag
67    shutdown: Arc<AtomicBool>,
68    /// Thread handle
69    handle: Option<JoinHandle<()>>,
70    /// Statistics
71    stats: TtlSweeperStats,
72}
73
74/// Statistics for TTL sweeper operations
75pub struct TtlSweeperStats {
76    /// Total keys sampled
77    pub total_sampled: Arc<AtomicU64>,
78    /// Total keys expired
79    pub total_expired: Arc<AtomicU64>,
80    /// Total cleaning runs
81    pub total_runs: Arc<AtomicU64>,
82    /// Last run timestamp (nanoseconds)
83    pub last_run: Arc<AtomicU64>,
84}
85
86impl TtlSweeperStats {
87    fn new() -> Self {
88        Self {
89            total_sampled: Arc::new(AtomicU64::new(0)),
90            total_expired: Arc::new(AtomicU64::new(0)),
91            total_runs: Arc::new(AtomicU64::new(0)),
92            last_run: Arc::new(AtomicU64::new(0)),
93        }
94    }
95}
96
97impl TtlSweeper {
98    /// Create a new TTL sweeper
99    pub fn new(store: Weak<FeoxStore>, config: TtlConfig) -> Self {
100        Self {
101            store,
102            config,
103            shutdown: Arc::new(AtomicBool::new(false)),
104            handle: None,
105            stats: TtlSweeperStats::new(),
106        }
107    }
108
109    /// Start the background sweeper thread
110    pub fn start(&mut self) {
111        if !self.config.enabled {
112            return;
113        }
114
115        let store = self.store.clone();
116        let config = self.config.clone();
117        let shutdown = self.shutdown.clone();
118        let stats = TtlSweeperStats {
119            total_sampled: self.stats.total_sampled.clone(),
120            total_expired: self.stats.total_expired.clone(),
121            total_runs: self.stats.total_runs.clone(),
122            last_run: self.stats.last_run.clone(),
123        };
124
125        let handle = thread::spawn(move || {
126            run_sweeper_loop(store, config, shutdown, stats);
127        });
128
129        self.handle = Some(handle);
130    }
131
132    /// Stop the background sweeper thread
133    pub fn stop(&mut self) {
134        self.shutdown.store(true, Ordering::Release);
135
136        if let Some(handle) = self.handle.take() {
137            let _ = handle.join();
138        }
139    }
140
141    /// Get sweeper statistics
142    pub fn stats(&self) -> SweeperSnapshot {
143        SweeperSnapshot {
144            total_sampled: self.stats.total_sampled.load(Ordering::Relaxed),
145            total_expired: self.stats.total_expired.load(Ordering::Relaxed),
146            total_runs: self.stats.total_runs.load(Ordering::Relaxed),
147            last_run: self.stats.last_run.load(Ordering::Relaxed),
148        }
149    }
150}
151
152impl Drop for TtlSweeper {
153    fn drop(&mut self) {
154        self.stop();
155    }
156}
157
158/// Snapshot of sweeper statistics
159#[derive(Debug, Clone)]
160pub struct SweeperSnapshot {
161    pub total_sampled: u64,
162    pub total_expired: u64,
163    pub total_runs: u64,
164    pub last_run: u64,
165}
166
167/// Main sweeper loop that runs in the background thread
168fn run_sweeper_loop(
169    store: Weak<FeoxStore>,
170    config: TtlConfig,
171    shutdown: Arc<AtomicBool>,
172    stats: TtlSweeperStats,
173) {
174    while !shutdown.load(Ordering::Acquire) {
175        // Sleep between runs
176        thread::sleep(config.sleep_interval);
177
178        // Try to get strong reference to store
179        let Some(store) = store.upgrade() else {
180            // Store has been dropped, exit
181            break;
182        };
183
184        // Perform sweeping run
185        let start = Instant::now();
186        let mut iterations = 0;
187        let mut total_sampled = 0;
188        let mut total_expired = 0;
189
190        loop {
191            // Sample and expire a batch
192            let (sampled, expired) = sample_and_expire_batch(&store, &config);
193            total_sampled += sampled;
194            total_expired += expired;
195            iterations += 1;
196
197            // Calculate expiry rate
198            let expiry_rate = if sampled > 0 {
199                expired as f32 / sampled as f32
200            } else {
201                0.0
202            };
203
204            // Check stop conditions
205            if expiry_rate < config.expiry_threshold {
206                break; // Few expired keys, we're done
207            }
208            if iterations >= config.max_iterations {
209                break; // Bounded iterations
210            }
211            if start.elapsed() > config.max_time_per_run {
212                break; // Bounded time
213            }
214        }
215
216        // Update statistics
217        if total_sampled > 0 {
218            stats
219                .total_sampled
220                .fetch_add(total_sampled, Ordering::Relaxed);
221            stats
222                .total_expired
223                .fetch_add(total_expired, Ordering::Relaxed);
224            stats.total_runs.fetch_add(1, Ordering::Relaxed);
225            stats.last_run.store(
226                std::time::SystemTime::now()
227                    .duration_since(std::time::UNIX_EPOCH)
228                    .unwrap()
229                    .as_nanos() as u64,
230                Ordering::Relaxed,
231            );
232        }
233
234        // Check shutdown flag again
235        if shutdown.load(Ordering::Acquire) {
236            break;
237        }
238    }
239}
240
241/// Sample keys and expire those that have exceeded their TTL
242fn sample_and_expire_batch(store: &Arc<FeoxStore>, config: &TtlConfig) -> (u64, u64) {
243    let now = store.get_timestamp_pub();
244    let mut sampled = 0;
245    let mut expired = 0;
246    let mut rng = rand::rng();
247
248    // Get access to the hash table
249    let hash_table = store.get_hash_table();
250
251    // Sample random entries directly from the hash table
252    for _ in 0..config.sample_size {
253        // Try to get a random entry with TTL
254        if let Some((key, record)) = get_random_ttl_entry(hash_table, &mut rng) {
255            sampled += 1;
256
257            let ttl_expiry = record.ttl_expiry.load(Ordering::Relaxed);
258
259            // Check if expired
260            if ttl_expiry > 0 && ttl_expiry < now {
261                // Remove expired entry
262                hash_table.remove(&key);
263                store.remove_from_tree(&key);
264
265                expired += 1;
266
267                // Queue disk cleanup if needed
268                if record.sector.load(Ordering::Relaxed) > 0 {
269                    // Add to write buffer for disk cleanup
270                    if let Some(wb) = store.get_write_buffer() {
271                        let _ = wb.add_write(Operation::Delete, record, 0);
272                    }
273                }
274            }
275        }
276    }
277
278    (sampled, expired)
279}
280
281/// Get a random entry with TTL using sampling
282fn get_random_ttl_entry(
283    hash_table: &scc::HashMap<Vec<u8>, Arc<crate::core::record::Record>, ahash::RandomState>,
284    rng: &mut ThreadRng,
285) -> Option<(Vec<u8>, Arc<crate::core::record::Record>)> {
286    // Sample up to 100 entries and pick one with TTL
287    let mut candidates = Vec::new();
288    let mut count = 0;
289
290    hash_table.scan(|key: &Vec<u8>, value: &Arc<crate::core::record::Record>| {
291        if count >= 100 {
292            return; // Stop iteration
293        }
294        count += 1;
295
296        if value.ttl_expiry.load(Ordering::Relaxed) > 0 {
297            candidates.push((key.clone(), value.clone()));
298        }
299    });
300
301    if candidates.is_empty() {
302        None
303    } else {
304        // Pick a random candidate
305        let idx = rng.random_range(0..candidates.len());
306        Some(candidates.into_iter().nth(idx).unwrap())
307    }
308}