1use crossbeam_channel::{bounded, Receiver, Sender};
2use crossbeam_utils::CachePadded;
3use parking_lot::{Mutex, RwLock};
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant};
9
10use crate::constants::*;
11use crate::core::record::Record;
12use crate::error::{FeoxError, Result};
13use crate::stats::Statistics;
14use crate::storage::free_space::FreeSpaceManager;
15use crate::storage::io::DiskIO;
16
17#[repr(align(64))] pub struct ShardedWriteBuffer {
21 buffer: Mutex<VecDeque<WriteEntry>>,
23
24 count: AtomicUsize,
26
27 size: AtomicUsize,
29}
30
31pub struct WriteEntry {
33 pub op: Operation,
34 pub record: Arc<Record>,
35 pub old_value_len: usize,
36 pub work_status: AtomicU32,
37 pub retry_count: AtomicU32,
38 pub timestamp: Instant,
39}
40
41pub struct WriteBuffer {
43 sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
45
46 disk_io: Arc<RwLock<DiskIO>>,
48
49 free_space: Arc<RwLock<FreeSpaceManager>>,
51
52 worker_channels: Vec<Sender<FlushRequest>>,
54
55 worker_handles: Vec<JoinHandle<()>>,
57
58 periodic_flush_handle: Option<JoinHandle<()>>,
60
61 shutdown: Arc<AtomicBool>,
63
64 stats: Arc<Statistics>,
66}
67
68#[derive(Debug)]
69struct FlushRequest {
70 response: Option<Sender<Result<()>>>,
71}
72
73struct WorkerContext {
74 worker_id: usize,
75 disk_io: Arc<RwLock<DiskIO>>,
76 free_space: Arc<RwLock<FreeSpaceManager>>,
77 sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
78 shutdown: Arc<AtomicBool>,
79 stats: Arc<Statistics>,
80}
81
82impl ShardedWriteBuffer {
83 fn new(_shard_id: usize) -> Self {
84 Self {
85 buffer: Mutex::new(VecDeque::new()),
86 count: AtomicUsize::new(0),
87 size: AtomicUsize::new(0),
88 }
89 }
90
91 fn add_entry(&self, entry: WriteEntry) -> Result<()> {
92 let entry_size = entry.record.calculate_size();
93
94 let mut buffer = self.buffer.lock();
95 buffer.push_back(entry);
96
97 self.count.fetch_add(1, Ordering::AcqRel);
98 self.size.fetch_add(entry_size, Ordering::AcqRel);
99
100 Ok(())
101 }
102
103 fn drain_entries(&self) -> Vec<WriteEntry> {
104 let mut buffer = self.buffer.lock();
105 let entries: Vec<_> = buffer.drain(..).collect();
106
107 self.count.store(0, Ordering::Release);
108 self.size.store(0, Ordering::Release);
109
110 entries
111 }
112
113 fn is_full(&self) -> bool {
114 self.count.load(Ordering::Acquire) >= WRITE_BUFFER_SIZE
115 || self.size.load(Ordering::Acquire) >= FEOX_WRITE_BUFFER_SIZE
116 }
117}
118
119impl WriteBuffer {
120 pub fn new(
121 disk_io: Arc<RwLock<DiskIO>>,
122 free_space: Arc<RwLock<FreeSpaceManager>>,
123 stats: Arc<Statistics>,
124 ) -> Self {
125 let num_shards = (num_cpus::get() / 2).max(1);
127
128 let sharded_buffers = Arc::new(
129 (0..num_shards)
130 .map(|shard_id| CachePadded::new(ShardedWriteBuffer::new(shard_id)))
131 .collect(),
132 );
133
134 Self {
135 sharded_buffers,
136 disk_io,
137 free_space,
138 worker_channels: Vec::new(),
139 worker_handles: Vec::new(),
140 periodic_flush_handle: None,
141 shutdown: Arc::new(AtomicBool::new(false)),
142 stats,
143 }
144 }
145
146 pub fn add_write(
148 &self,
149 op: Operation,
150 record: Arc<Record>,
151 old_value_len: usize,
152 ) -> Result<()> {
153 if self.shutdown.load(Ordering::Acquire) {
154 return Err(FeoxError::ShuttingDown);
155 }
156
157 let entry = WriteEntry {
158 op,
159 record,
160 old_value_len,
161 work_status: AtomicU32::new(0),
162 retry_count: AtomicU32::new(0),
163 timestamp: Instant::now(),
164 };
165
166 let shard_id = self.get_shard_id();
168 let buffer = &self.sharded_buffers[shard_id];
169
170 buffer.add_entry(entry)?;
171 self.stats.record_write_buffered();
172
173 if buffer.is_full() && shard_id < self.worker_channels.len() {
175 let req = FlushRequest { response: None };
176 let _ = self.worker_channels[shard_id].try_send(req);
177 }
178
179 Ok(())
180 }
181
182 pub fn start_workers(&mut self, num_workers: usize) {
184 let num_shards = self.sharded_buffers.len();
186 let actual_workers = num_workers.min(num_shards);
187
188 let mut receivers = Vec::new();
190 for _ in 0..actual_workers {
191 let (tx, rx) = bounded(2);
192 self.worker_channels.push(tx);
193 receivers.push(rx);
194 }
195
196 for worker_id in 0..actual_workers {
198 let ctx = WorkerContext {
199 worker_id,
200 disk_io: self.disk_io.clone(),
201 free_space: self.free_space.clone(),
202 sharded_buffers: self.sharded_buffers.clone(),
203 shutdown: self.shutdown.clone(),
204 stats: self.stats.clone(),
205 };
206 let flush_rx = receivers.pop().unwrap();
207
208 let handle = thread::spawn(move || {
209 write_buffer_worker(ctx, flush_rx);
210 });
211
212 self.worker_handles.push(handle);
213 }
214
215 let worker_channels = self.worker_channels.clone();
217 let shutdown = self.shutdown.clone();
218 let sharded_buffers = self.sharded_buffers.clone();
219
220 let periodic_handle = thread::spawn(move || {
221 let interval = WRITE_BUFFER_FLUSH_INTERVAL;
222
223 while !shutdown.load(Ordering::Acquire) {
224 thread::sleep(interval);
225
226 for (shard_id, buffer) in sharded_buffers.iter().enumerate() {
228 if buffer.count.load(Ordering::Relaxed) > 0 && shard_id < worker_channels.len()
229 {
230 let req = FlushRequest { response: None };
231 let _ = worker_channels[shard_id].try_send(req);
232 }
233 }
234 }
235 });
236
237 self.periodic_flush_handle = Some(periodic_handle);
238 }
239
240 pub fn force_flush(&self) -> Result<()> {
242 let mut responses = Vec::new();
243
244 for worker_tx in &self.worker_channels {
246 let (tx, rx) = bounded(1);
247 let req = FlushRequest { response: Some(tx) };
248
249 worker_tx.send(req).map_err(|_| FeoxError::ChannelError)?;
250 responses.push(rx);
251 }
252
253 for rx in responses {
255 rx.recv().map_err(|_| FeoxError::ChannelError)??;
256 }
257
258 Ok(())
259 }
260
261 pub fn initiate_shutdown(&self) {
263 self.shutdown.store(true, Ordering::Release);
264
265 }
268
269 pub fn complete_shutdown(&mut self) {
271 use std::time::Duration;
272
273 self.shutdown.store(true, Ordering::Release);
275
276 if let Some(handle) = self.periodic_flush_handle.take() {
278 let (tx, rx) = crossbeam_channel::bounded(1);
280 thread::spawn(move || {
281 let _ = handle.join();
282 let _ = tx.send(());
283 });
284
285 if rx.recv_timeout(Duration::from_secs(5)).is_err() {
286 }
288 }
289
290 for handle in self.worker_handles.drain(..) {
292 let _ = handle.join();
293 }
294
295 }
298
299 pub fn shutdown(&mut self) {
301 self.complete_shutdown();
302 }
303
304 #[inline]
305 fn get_shard_id(&self) -> usize {
306 thread_local! {
307 static THREAD_SHARD_ID: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
308 }
309
310 THREAD_SHARD_ID.with(|id| {
311 if let Some(cpu_id) = id.get() {
312 cpu_id
313 } else {
314 use std::collections::hash_map::RandomState;
316 use std::hash::BuildHasher;
317 let shard_id = RandomState::new().hash_one(std::thread::current().id()) as usize
318 % self.sharded_buffers.len();
319 id.set(Some(shard_id));
320 shard_id
321 }
322 })
323 }
324}
325
326fn write_buffer_worker(ctx: WorkerContext, flush_rx: Receiver<FlushRequest>) {
328 let worker_id = ctx.worker_id;
329
330 loop {
331 if ctx.shutdown.load(Ordering::Acquire) {
332 break;
333 }
334
335 let req = match flush_rx.recv_timeout(Duration::from_millis(500)) {
337 Ok(req) => req,
338 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
339 continue;
340 }
341 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
342 break;
343 }
344 };
345
346 if worker_id < ctx.sharded_buffers.len() {
348 let buffer = &ctx.sharded_buffers[worker_id];
349 let entries = buffer.drain_entries();
350
351 if !entries.is_empty() {
352 let result =
354 process_write_batch(&ctx.disk_io, &ctx.free_space, entries, &ctx.stats);
355
356 ctx.stats.flush_count.fetch_add(1, Ordering::Relaxed);
357
358 if let Some(tx) = req.response {
360 let _ = tx.send(result);
361 }
362 } else if let Some(tx) = req.response {
363 let _ = tx.send(Ok(()));
365 }
366 }
367 }
368
369 if ctx.shutdown.load(Ordering::Acquire) && worker_id < ctx.sharded_buffers.len() {
371 let buffer = &ctx.sharded_buffers[worker_id];
372 let final_entries = buffer.drain_entries();
373
374 if !final_entries.is_empty() {
375 let _ = process_write_batch(&ctx.disk_io, &ctx.free_space, final_entries, &ctx.stats);
376 }
377 }
378}
379
380fn process_write_batch(
382 disk_io: &Arc<RwLock<DiskIO>>,
383 free_space: &Arc<RwLock<FreeSpaceManager>>,
384 entries: Vec<WriteEntry>,
385 stats: &Arc<Statistics>,
386) -> Result<()> {
387 let mut batch_writes = Vec::new();
388 let mut delete_operations = Vec::new();
389 let mut records_to_clear = Vec::new();
390
391 for entry in entries {
393 match entry.op {
394 Operation::Insert | Operation::Update => {
395 if entry.record.refcount.load(Ordering::Acquire) > 0
397 && entry.record.sector.load(Ordering::Acquire) == 0
398 {
399 let data = prepare_record_data(&entry.record)?;
400 let sectors_needed = data.len().div_ceil(FEOX_BLOCK_SIZE);
401 let sector = free_space.write().allocate_sectors(sectors_needed as u64)?;
402
403 stats
405 .disk_usage
406 .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
407
408 batch_writes.push((sector, data));
409 records_to_clear.push((sector, entry.record.clone()));
410 }
411 }
412 Operation::Delete => {
413 let sector = entry.record.sector.load(Ordering::Acquire);
414 if sector != 0 {
415 delete_operations.push((sector, entry.record.key.len(), entry.old_value_len));
416 }
417 }
418 _ => {}
419 }
420 }
421
422 for (sector, key_len, value_len) in delete_operations {
424 let mut deletion_marker = vec![0u8; FEOX_BLOCK_SIZE];
426 deletion_marker[..8].copy_from_slice(b"\0DELETED");
427
428 let _ = disk_io.write().write_sectors_sync(sector, &deletion_marker);
429
430 let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len;
432 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
433
434 let _ = free_space
435 .write()
436 .release_sectors(sector, sectors_needed as u64);
437
438 stats
440 .disk_usage
441 .fetch_sub((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
442 }
443
444 if !batch_writes.is_empty() {
446 let mut retries = 3;
448 let mut delay_us = 100;
449
450 while retries > 0 {
451 let result = disk_io.write().batch_write(batch_writes.clone());
453
454 match result {
455 Ok(()) => {
456 for (sector, record) in &records_to_clear {
457 record.sector.store(*sector, Ordering::Release);
458 std::sync::atomic::fence(Ordering::Release);
459 record.clear_value();
460 }
461 stats.record_write_flushed(records_to_clear.len() as u64);
462 break;
463 }
464 Err(e) => {
465 retries -= 1;
466 if retries > 0 {
467 let jitter = {
469 use rand::Rng;
470 let mut rng = rand::rng();
471 (delay_us * rng.random_range(-10..=10)) / 100
472 };
473 let actual_delay = (delay_us + jitter).max(1);
474 thread::sleep(Duration::from_micros(actual_delay as u64));
475 delay_us *= 2;
476 } else {
477 stats.record_write_failed();
478 for (sector, _) in &records_to_clear {
479 free_space.write().release_sectors(*sector, 1)?;
480 }
481 return Err(e);
482 }
483 }
484 }
485 }
486 }
487
488 Ok(())
489}
490
491fn prepare_record_data(record: &Record) -> Result<Vec<u8>> {
492 let total_size = SECTOR_HEADER_SIZE + 2 + record.key.len() + 8 + 8 + record.value_len;
493
494 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
496 let padded_size = sectors_needed * FEOX_BLOCK_SIZE;
497
498 let mut data = Vec::with_capacity(padded_size);
499
500 data.extend_from_slice(&SECTOR_MARKER.to_le_bytes());
502 data.extend_from_slice(&0u16.to_le_bytes()); data.extend_from_slice(&(record.key.len() as u16).to_le_bytes());
506 data.extend_from_slice(&record.key);
507 data.extend_from_slice(&(record.value_len as u64).to_le_bytes());
508 data.extend_from_slice(&record.timestamp.to_le_bytes());
509
510 if let Some(value) = record.get_value() {
511 data.extend_from_slice(value.as_ref());
512 }
513
514 data.resize(padded_size, 0);
516
517 Ok(data)
518}
519
520impl Drop for WriteBuffer {
521 fn drop(&mut self) {
522 if !self.shutdown.load(Ordering::Acquire) {
523 self.complete_shutdown();
524 }
525 }
526}