1use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
2use std::sync::{Arc, Weak};
3use std::thread::{self, JoinHandle};
4use std::time::{Duration, Instant};
5
6use rand::rngs::ThreadRng;
7use rand::Rng;
8
9use crate::constants::Operation;
10use crate::core::store::FeoxStore;
11
12#[derive(Clone, Debug)]
14pub struct TtlConfig {
15 pub sample_size: usize,
17 pub expiry_threshold: f32,
19 pub max_iterations: usize,
21 pub max_time_per_run: Duration,
23 pub sleep_interval: Duration,
25 pub enabled: bool,
27}
28
29impl Default for TtlConfig {
30 fn default() -> Self {
31 Self {
32 sample_size: 100,
33 expiry_threshold: 0.25,
34 max_iterations: 16,
35 max_time_per_run: Duration::from_millis(1),
36 sleep_interval: Duration::from_millis(1000),
37 enabled: false,
38 }
39 }
40}
41
42impl TtlConfig {
43 pub fn default_persistent() -> Self {
45 Self {
46 enabled: true,
47 ..Default::default()
48 }
49 }
50
51 pub fn default_memory() -> Self {
53 Self {
54 enabled: true,
55 ..Default::default()
56 }
57 }
58}
59
60pub struct TtlSweeper {
62 store: Weak<FeoxStore>,
64 config: TtlConfig,
66 shutdown: Arc<AtomicBool>,
68 handle: Option<JoinHandle<()>>,
70 stats: TtlSweeperStats,
72}
73
74pub struct TtlSweeperStats {
76 pub total_sampled: Arc<AtomicU64>,
78 pub total_expired: Arc<AtomicU64>,
80 pub total_runs: Arc<AtomicU64>,
82 pub last_run: Arc<AtomicU64>,
84}
85
86impl TtlSweeperStats {
87 fn new() -> Self {
88 Self {
89 total_sampled: Arc::new(AtomicU64::new(0)),
90 total_expired: Arc::new(AtomicU64::new(0)),
91 total_runs: Arc::new(AtomicU64::new(0)),
92 last_run: Arc::new(AtomicU64::new(0)),
93 }
94 }
95}
96
97impl TtlSweeper {
98 pub fn new(store: Weak<FeoxStore>, config: TtlConfig) -> Self {
100 Self {
101 store,
102 config,
103 shutdown: Arc::new(AtomicBool::new(false)),
104 handle: None,
105 stats: TtlSweeperStats::new(),
106 }
107 }
108
109 pub fn start(&mut self) {
111 if !self.config.enabled {
112 return;
113 }
114
115 let store = self.store.clone();
116 let config = self.config.clone();
117 let shutdown = self.shutdown.clone();
118 let stats = TtlSweeperStats {
119 total_sampled: self.stats.total_sampled.clone(),
120 total_expired: self.stats.total_expired.clone(),
121 total_runs: self.stats.total_runs.clone(),
122 last_run: self.stats.last_run.clone(),
123 };
124
125 let handle = thread::spawn(move || {
126 run_sweeper_loop(store, config, shutdown, stats);
127 });
128
129 self.handle = Some(handle);
130 }
131
132 pub fn stop(&mut self) {
134 self.shutdown.store(true, Ordering::Release);
135
136 if let Some(handle) = self.handle.take() {
137 let _ = handle.join();
138 }
139 }
140
141 pub fn stats(&self) -> SweeperSnapshot {
143 SweeperSnapshot {
144 total_sampled: self.stats.total_sampled.load(Ordering::Relaxed),
145 total_expired: self.stats.total_expired.load(Ordering::Relaxed),
146 total_runs: self.stats.total_runs.load(Ordering::Relaxed),
147 last_run: self.stats.last_run.load(Ordering::Relaxed),
148 }
149 }
150}
151
152impl Drop for TtlSweeper {
153 fn drop(&mut self) {
154 self.stop();
155 }
156}
157
158#[derive(Debug, Clone)]
160pub struct SweeperSnapshot {
161 pub total_sampled: u64,
162 pub total_expired: u64,
163 pub total_runs: u64,
164 pub last_run: u64,
165}
166
167fn run_sweeper_loop(
169 store: Weak<FeoxStore>,
170 config: TtlConfig,
171 shutdown: Arc<AtomicBool>,
172 stats: TtlSweeperStats,
173) {
174 while !shutdown.load(Ordering::Acquire) {
175 thread::sleep(config.sleep_interval);
177
178 let Some(store) = store.upgrade() else {
180 break;
182 };
183
184 let start = Instant::now();
186 let mut iterations = 0;
187 let mut total_sampled = 0;
188 let mut total_expired = 0;
189
190 loop {
191 let (sampled, expired) = sample_and_expire_batch(&store, &config);
193 total_sampled += sampled;
194 total_expired += expired;
195 iterations += 1;
196
197 let expiry_rate = if sampled > 0 {
199 expired as f32 / sampled as f32
200 } else {
201 0.0
202 };
203
204 if expiry_rate < config.expiry_threshold {
206 break; }
208 if iterations >= config.max_iterations {
209 break; }
211 if start.elapsed() > config.max_time_per_run {
212 break; }
214 }
215
216 if total_sampled > 0 {
218 stats
219 .total_sampled
220 .fetch_add(total_sampled, Ordering::Relaxed);
221 stats
222 .total_expired
223 .fetch_add(total_expired, Ordering::Relaxed);
224 stats.total_runs.fetch_add(1, Ordering::Relaxed);
225 stats.last_run.store(
226 std::time::SystemTime::now()
227 .duration_since(std::time::UNIX_EPOCH)
228 .unwrap()
229 .as_nanos() as u64,
230 Ordering::Relaxed,
231 );
232 }
233
234 if shutdown.load(Ordering::Acquire) {
236 break;
237 }
238 }
239}
240
241fn sample_and_expire_batch(store: &Arc<FeoxStore>, config: &TtlConfig) -> (u64, u64) {
243 let now = store.get_timestamp_pub();
244 let mut sampled = 0;
245 let mut expired = 0;
246 let mut rng = rand::rng();
247
248 let hash_table = store.get_hash_table();
250
251 for _ in 0..config.sample_size {
253 if let Some((key, record)) = get_random_ttl_entry(hash_table, &mut rng) {
255 sampled += 1;
256
257 let ttl_expiry = record.ttl_expiry.load(Ordering::Relaxed);
258
259 if ttl_expiry > 0 && ttl_expiry < now {
261 hash_table.remove(&key);
263 store.remove_from_tree(&key);
264
265 expired += 1;
266
267 if record.sector.load(Ordering::Relaxed) > 0 {
269 if let Some(wb) = store.get_write_buffer() {
271 let _ = wb.add_write(Operation::Delete, record, 0);
272 }
273 }
274 }
275 }
276 }
277
278 (sampled, expired)
279}
280
281fn get_random_ttl_entry(
283 hash_table: &scc::HashMap<Vec<u8>, Arc<crate::core::record::Record>, ahash::RandomState>,
284 rng: &mut ThreadRng,
285) -> Option<(Vec<u8>, Arc<crate::core::record::Record>)> {
286 let mut candidates = Vec::new();
288 let mut count = 0;
289
290 hash_table.scan(|key: &Vec<u8>, value: &Arc<crate::core::record::Record>| {
291 if count >= 100 {
292 return; }
294 count += 1;
295
296 if value.ttl_expiry.load(Ordering::Relaxed) > 0 {
297 candidates.push((key.clone(), value.clone()));
298 }
299 });
300
301 if candidates.is_empty() {
302 None
303 } else {
304 let idx = rng.random_range(0..candidates.len());
306 Some(candidates.into_iter().nth(idx).unwrap())
307 }
308}