1use crossbeam_channel::{bounded, Receiver, Sender};
2use crossbeam_utils::CachePadded;
3use parking_lot::{Mutex, RwLock};
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant};
9
10use crate::constants::*;
11use crate::core::record::Record;
12use crate::error::{FeoxError, Result};
13use crate::stats::Statistics;
14use crate::storage::format::{get_format, RecordFormat};
15use crate::storage::free_space::FreeSpaceManager;
16use crate::storage::io::DiskIO;
17
18#[repr(align(64))] pub struct ShardedWriteBuffer {
22 buffer: Mutex<VecDeque<WriteEntry>>,
24
25 count: AtomicUsize,
27
28 size: AtomicUsize,
30}
31
32pub struct WriteEntry {
34 pub op: Operation,
35 pub record: Arc<Record>,
36 pub old_value_len: usize,
37 pub work_status: AtomicU32,
38 pub retry_count: AtomicU32,
39 pub timestamp: Instant,
40}
41
42pub struct WriteBuffer {
44 sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
46
47 disk_io: Arc<RwLock<DiskIO>>,
49
50 free_space: Arc<RwLock<FreeSpaceManager>>,
52
53 worker_channels: Vec<Sender<FlushRequest>>,
55
56 worker_handles: Vec<JoinHandle<()>>,
58
59 periodic_flush_handle: Option<JoinHandle<()>>,
61
62 shutdown: Arc<AtomicBool>,
64
65 stats: Arc<Statistics>,
67
68 format_version: u32,
70}
71
72#[derive(Debug)]
73struct FlushRequest {
74 response: Option<Sender<Result<()>>>,
75}
76
77struct WorkerContext {
78 worker_id: usize,
79 disk_io: Arc<RwLock<DiskIO>>,
80 free_space: Arc<RwLock<FreeSpaceManager>>,
81 sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
82 shutdown: Arc<AtomicBool>,
83 stats: Arc<Statistics>,
84 format_version: u32,
85}
86
87impl ShardedWriteBuffer {
88 fn new(_shard_id: usize) -> Self {
89 Self {
90 buffer: Mutex::new(VecDeque::new()),
91 count: AtomicUsize::new(0),
92 size: AtomicUsize::new(0),
93 }
94 }
95
96 fn add_entry(&self, entry: WriteEntry) -> Result<()> {
97 let entry_size = entry.record.calculate_size();
98
99 let mut buffer = self.buffer.lock();
100 buffer.push_back(entry);
101
102 self.count.fetch_add(1, Ordering::AcqRel);
103 self.size.fetch_add(entry_size, Ordering::AcqRel);
104
105 Ok(())
106 }
107
108 fn drain_entries(&self) -> Vec<WriteEntry> {
109 let mut buffer = self.buffer.lock();
110 let entries: Vec<_> = buffer.drain(..).collect();
111
112 self.count.store(0, Ordering::Release);
113 self.size.store(0, Ordering::Release);
114
115 entries
116 }
117
118 fn is_full(&self) -> bool {
119 self.count.load(Ordering::Acquire) >= WRITE_BUFFER_SIZE
120 || self.size.load(Ordering::Acquire) >= FEOX_WRITE_BUFFER_SIZE
121 }
122}
123
124impl WriteBuffer {
125 pub fn new(
126 disk_io: Arc<RwLock<DiskIO>>,
127 free_space: Arc<RwLock<FreeSpaceManager>>,
128 stats: Arc<Statistics>,
129 format_version: u32,
130 ) -> Self {
131 let num_shards = (num_cpus::get() / 2).max(1);
133
134 let sharded_buffers = Arc::new(
135 (0..num_shards)
136 .map(|shard_id| CachePadded::new(ShardedWriteBuffer::new(shard_id)))
137 .collect(),
138 );
139
140 Self {
141 sharded_buffers,
142 disk_io,
143 free_space,
144 worker_channels: Vec::new(),
145 worker_handles: Vec::new(),
146 periodic_flush_handle: None,
147 shutdown: Arc::new(AtomicBool::new(false)),
148 stats,
149 format_version,
150 }
151 }
152
153 pub fn add_write(
155 &self,
156 op: Operation,
157 record: Arc<Record>,
158 old_value_len: usize,
159 ) -> Result<()> {
160 if self.shutdown.load(Ordering::Acquire) {
161 return Err(FeoxError::ShuttingDown);
162 }
163
164 let entry = WriteEntry {
165 op,
166 record,
167 old_value_len,
168 work_status: AtomicU32::new(0),
169 retry_count: AtomicU32::new(0),
170 timestamp: Instant::now(),
171 };
172
173 let shard_id = self.get_shard_id();
175 let buffer = &self.sharded_buffers[shard_id];
176
177 buffer.add_entry(entry)?;
178 self.stats.record_write_buffered();
179
180 if buffer.is_full() && shard_id < self.worker_channels.len() {
182 let req = FlushRequest { response: None };
183 let _ = self.worker_channels[shard_id].try_send(req);
184 }
185
186 Ok(())
187 }
188
189 pub fn start_workers(&mut self, num_workers: usize) {
191 let num_shards = self.sharded_buffers.len();
193 let actual_workers = num_workers.min(num_shards);
194
195 let mut receivers = Vec::new();
197 for _ in 0..actual_workers {
198 let (tx, rx) = bounded(2);
199 self.worker_channels.push(tx);
200 receivers.push(rx);
201 }
202
203 for worker_id in 0..actual_workers {
205 let ctx = WorkerContext {
206 worker_id,
207 disk_io: self.disk_io.clone(),
208 free_space: self.free_space.clone(),
209 sharded_buffers: self.sharded_buffers.clone(),
210 shutdown: self.shutdown.clone(),
211 stats: self.stats.clone(),
212 format_version: self.format_version,
213 };
214
215 let flush_rx = receivers.pop().unwrap();
217
218 let handle = thread::spawn(move || {
219 write_buffer_worker(ctx, flush_rx);
220 });
221
222 self.worker_handles.push(handle);
223 }
224
225 let worker_channels = self.worker_channels.clone();
227 let shutdown = self.shutdown.clone();
228 let sharded_buffers = self.sharded_buffers.clone();
229
230 let periodic_handle = thread::spawn(move || {
231 let interval = WRITE_BUFFER_FLUSH_INTERVAL;
232
233 while !shutdown.load(Ordering::Acquire) {
234 thread::sleep(interval);
235
236 for (shard_id, buffer) in sharded_buffers.iter().enumerate() {
238 let count = buffer.count.load(Ordering::Relaxed);
239 if count > 0 && shard_id < worker_channels.len() {
240 let req = FlushRequest { response: None };
241 let channel_idx = worker_channels.len() - 1 - shard_id;
243 let _ = worker_channels[channel_idx].try_send(req);
244 }
245 }
246 }
247 });
248
249 self.periodic_flush_handle = Some(periodic_handle);
250 }
251
252 pub fn force_flush(&self) -> Result<()> {
254 let mut responses = Vec::new();
255
256 for worker_tx in &self.worker_channels {
258 let (tx, rx) = bounded(1);
259 let req = FlushRequest { response: Some(tx) };
260
261 worker_tx.send(req).map_err(|_| FeoxError::ChannelError)?;
262 responses.push(rx);
263 }
264
265 for rx in responses {
267 rx.recv().map_err(|_| FeoxError::ChannelError)??;
268 }
269
270 Ok(())
271 }
272
273 pub fn initiate_shutdown(&self) {
275 self.shutdown.store(true, Ordering::Release);
276
277 }
280
281 pub fn complete_shutdown(&mut self) {
283 use std::time::Duration;
284
285 self.shutdown.store(true, Ordering::Release);
287
288 if let Some(handle) = self.periodic_flush_handle.take() {
290 let (tx, rx) = crossbeam_channel::bounded(1);
292 thread::spawn(move || {
293 let _ = handle.join();
294 let _ = tx.send(());
295 });
296
297 if rx.recv_timeout(Duration::from_secs(5)).is_err() {
298 }
300 }
301
302 for handle in self.worker_handles.drain(..) {
304 let _ = handle.join();
305 }
306
307 }
310
311 pub fn shutdown(&mut self) {
313 self.complete_shutdown();
314 }
315
316 #[inline]
317 fn get_shard_id(&self) -> usize {
318 thread_local! {
319 static THREAD_SHARD_ID: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
320 }
321
322 THREAD_SHARD_ID.with(|id| {
323 if let Some(cpu_id) = id.get() {
324 cpu_id
325 } else {
326 use std::collections::hash_map::RandomState;
328 use std::hash::BuildHasher;
329 let shard_id = RandomState::new().hash_one(std::thread::current().id()) as usize
330 % self.sharded_buffers.len();
331 id.set(Some(shard_id));
332 shard_id
333 }
334 })
335 }
336}
337
338fn write_buffer_worker(ctx: WorkerContext, flush_rx: Receiver<FlushRequest>) {
340 let worker_id = ctx.worker_id;
341 let format = get_format(ctx.format_version);
342
343 loop {
344 if ctx.shutdown.load(Ordering::Acquire) {
345 break;
346 }
347
348 let req = match flush_rx.recv_timeout(Duration::from_millis(500)) {
350 Ok(req) => req,
351 Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
352 continue;
353 }
354 Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
355 break;
356 }
357 };
358
359 if worker_id < ctx.sharded_buffers.len() {
361 let buffer = &ctx.sharded_buffers[worker_id];
362 let entries = buffer.drain_entries();
363
364 if !entries.is_empty() {
365 let result = process_write_batch(
367 &ctx.disk_io,
368 &ctx.free_space,
369 entries,
370 &ctx.stats,
371 format.as_ref(),
372 );
373
374 ctx.stats.flush_count.fetch_add(1, Ordering::Relaxed);
375
376 if let Some(tx) = req.response {
378 let _ = tx.send(result);
379 }
380 } else if let Some(tx) = req.response {
381 let _ = tx.send(Ok(()));
383 }
384 }
385 }
386
387 if ctx.shutdown.load(Ordering::Acquire) && worker_id < ctx.sharded_buffers.len() {
389 let buffer = &ctx.sharded_buffers[worker_id];
390 let final_entries = buffer.drain_entries();
391
392 if !final_entries.is_empty() {
393 let _ = process_write_batch(
394 &ctx.disk_io,
395 &ctx.free_space,
396 final_entries,
397 &ctx.stats,
398 format.as_ref(),
399 );
400 }
401 }
402}
403
404fn process_write_batch(
406 disk_io: &Arc<RwLock<DiskIO>>,
407 free_space: &Arc<RwLock<FreeSpaceManager>>,
408 entries: Vec<WriteEntry>,
409 stats: &Arc<Statistics>,
410 format: &dyn RecordFormat,
411) -> Result<()> {
412 let mut batch_writes = Vec::new();
413 let mut delete_operations = Vec::new();
414 let mut records_to_clear = Vec::new();
415
416 for entry in entries {
418 match entry.op {
419 Operation::Insert | Operation::Update => {
420 if entry.record.refcount.load(Ordering::Acquire) > 0
422 && entry.record.sector.load(Ordering::Acquire) == 0
423 {
424 let data = prepare_record_data(&entry.record, format)?;
425 let sectors_needed = data.len().div_ceil(FEOX_BLOCK_SIZE);
426 let sector = free_space.write().allocate_sectors(sectors_needed as u64)?;
427
428 stats
430 .disk_usage
431 .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
432
433 batch_writes.push((sector, data));
434 records_to_clear.push((sector, entry.record.clone()));
435 }
436 }
437 Operation::Delete => {
438 let sector = entry.record.sector.load(Ordering::Acquire);
439 if sector != 0 {
440 delete_operations.push((sector, entry.record.key.len(), entry.old_value_len));
441 }
442 }
443 _ => {}
444 }
445 }
446
447 for (sector, key_len, value_len) in delete_operations {
449 let mut deletion_marker = vec![0u8; FEOX_BLOCK_SIZE];
451 deletion_marker[..8].copy_from_slice(b"\0DELETED");
452
453 let _ = disk_io.write().write_sectors_sync(sector, &deletion_marker);
454
455 let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len;
457 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
458
459 let _ = free_space
460 .write()
461 .release_sectors(sector, sectors_needed as u64);
462
463 stats
465 .disk_usage
466 .fetch_sub((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
467 }
468
469 if !batch_writes.is_empty() {
471 let mut retries = 3;
473 let mut delay_us = 100;
474
475 while retries > 0 {
476 let result = disk_io.write().batch_write(batch_writes.clone());
478
479 match result {
480 Ok(()) => {
481 for (sector, record) in &records_to_clear {
482 record.sector.store(*sector, Ordering::Release);
483 std::sync::atomic::fence(Ordering::Release);
484 record.clear_value();
485 }
486 stats.record_write_flushed(records_to_clear.len() as u64);
487 break;
488 }
489 Err(e) => {
490 retries -= 1;
491 if retries > 0 {
492 let jitter = {
494 use rand::Rng;
495 let mut rng = rand::rng();
496 (delay_us * rng.random_range(-10..=10)) / 100
497 };
498 let actual_delay = (delay_us + jitter).max(1);
499 thread::sleep(Duration::from_micros(actual_delay as u64));
500 delay_us *= 2;
501 } else {
502 stats.record_write_failed();
503 for (sector, _) in &records_to_clear {
504 free_space.write().release_sectors(*sector, 1)?;
505 }
506 return Err(e);
507 }
508 }
509 }
510 }
511 }
512
513 Ok(())
514}
515
516fn prepare_record_data(record: &Record, format: &dyn RecordFormat) -> Result<Vec<u8>> {
517 let total_size = format.total_size(record.key.len(), record.value_len);
519
520 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
522 let padded_size = sectors_needed * FEOX_BLOCK_SIZE;
523
524 let mut data = Vec::with_capacity(padded_size);
525
526 data.extend_from_slice(&SECTOR_MARKER.to_le_bytes());
528 data.extend_from_slice(&0u16.to_le_bytes()); let record_data = format.serialize_record(record, true);
532 data.extend_from_slice(&record_data);
533
534 data.resize(padded_size, 0);
536
537 Ok(data)
538}
539
540impl Drop for WriteBuffer {
541 fn drop(&mut self) {
542 if !self.shutdown.load(Ordering::Acquire) {
543 self.complete_shutdown();
544 }
545 }
546}