feoxdb/storage/
write_buffer.rs

1use crossbeam_channel::{bounded, Receiver, Sender};
2use crossbeam_utils::CachePadded;
3use parking_lot::{Mutex, RwLock};
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant};
9
10use crate::constants::*;
11use crate::core::record::Record;
12use crate::error::{FeoxError, Result};
13use crate::stats::Statistics;
14use crate::storage::free_space::FreeSpaceManager;
15use crate::storage::io::DiskIO;
16
17/// Sharded write buffer for reducing contention
18/// Each thread consistently uses the same shard to improve cache locality
19#[repr(align(64))] // Cache line alignment
20pub struct ShardedWriteBuffer {
21    /// Buffered writes pending flush
22    buffer: Mutex<VecDeque<WriteEntry>>,
23
24    /// Number of entries in buffer
25    count: AtomicUsize,
26
27    /// Total size of buffered data
28    size: AtomicUsize,
29}
30
31/// Write entry for buffered operations
32pub struct WriteEntry {
33    pub op: Operation,
34    pub record: Arc<Record>,
35    pub old_value_len: usize,
36    pub work_status: AtomicU32,
37    pub retry_count: AtomicU32,
38    pub timestamp: Instant,
39}
40
41/// Main write buffer coordinator
42pub struct WriteBuffer {
43    /// Sharded buffers to reduce contention between threads
44    sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
45
46    /// Shared disk I/O handle
47    disk_io: Arc<RwLock<DiskIO>>,
48
49    /// Free space manager for sector allocation
50    free_space: Arc<RwLock<FreeSpaceManager>>,
51
52    /// Per-worker channels for targeted flush requests
53    worker_channels: Vec<Sender<FlushRequest>>,
54
55    /// Background worker handles
56    worker_handles: Vec<JoinHandle<()>>,
57
58    /// Periodic flush thread handle
59    periodic_flush_handle: Option<JoinHandle<()>>,
60
61    /// Shutdown flag
62    shutdown: Arc<AtomicBool>,
63
64    /// Shared statistics
65    stats: Arc<Statistics>,
66}
67
68#[derive(Debug)]
69struct FlushRequest {
70    response: Option<Sender<Result<()>>>,
71}
72
73struct WorkerContext {
74    worker_id: usize,
75    disk_io: Arc<RwLock<DiskIO>>,
76    free_space: Arc<RwLock<FreeSpaceManager>>,
77    sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
78    shutdown: Arc<AtomicBool>,
79    stats: Arc<Statistics>,
80}
81
82impl ShardedWriteBuffer {
83    fn new(_shard_id: usize) -> Self {
84        Self {
85            buffer: Mutex::new(VecDeque::new()),
86            count: AtomicUsize::new(0),
87            size: AtomicUsize::new(0),
88        }
89    }
90
91    fn add_entry(&self, entry: WriteEntry) -> Result<()> {
92        let entry_size = entry.record.calculate_size();
93
94        let mut buffer = self.buffer.lock();
95        buffer.push_back(entry);
96
97        self.count.fetch_add(1, Ordering::AcqRel);
98        self.size.fetch_add(entry_size, Ordering::AcqRel);
99
100        Ok(())
101    }
102
103    fn drain_entries(&self) -> Vec<WriteEntry> {
104        let mut buffer = self.buffer.lock();
105        let entries: Vec<_> = buffer.drain(..).collect();
106
107        self.count.store(0, Ordering::Release);
108        self.size.store(0, Ordering::Release);
109
110        entries
111    }
112
113    fn is_full(&self) -> bool {
114        self.count.load(Ordering::Acquire) >= WRITE_BUFFER_SIZE
115            || self.size.load(Ordering::Acquire) >= FEOX_WRITE_BUFFER_SIZE
116    }
117}
118
119impl WriteBuffer {
120    pub fn new(
121        disk_io: Arc<RwLock<DiskIO>>,
122        free_space: Arc<RwLock<FreeSpaceManager>>,
123        stats: Arc<Statistics>,
124    ) -> Self {
125        // Use half CPU count for both shards and workers
126        let num_shards = (num_cpus::get() / 2).max(1);
127
128        let sharded_buffers = Arc::new(
129            (0..num_shards)
130                .map(|shard_id| CachePadded::new(ShardedWriteBuffer::new(shard_id)))
131                .collect(),
132        );
133
134        Self {
135            sharded_buffers,
136            disk_io,
137            free_space,
138            worker_channels: Vec::new(),
139            worker_handles: Vec::new(),
140            periodic_flush_handle: None,
141            shutdown: Arc::new(AtomicBool::new(false)),
142            stats,
143        }
144    }
145
146    /// Add write operation to buffer (lock-free fast path)
147    pub fn add_write(
148        &self,
149        op: Operation,
150        record: Arc<Record>,
151        old_value_len: usize,
152    ) -> Result<()> {
153        if self.shutdown.load(Ordering::Acquire) {
154            return Err(FeoxError::ShuttingDown);
155        }
156
157        let entry = WriteEntry {
158            op,
159            record,
160            old_value_len,
161            work_status: AtomicU32::new(0),
162            retry_count: AtomicU32::new(0),
163            timestamp: Instant::now(),
164        };
165
166        // Get thread's consistent shard
167        let shard_id = self.get_shard_id();
168        let buffer = &self.sharded_buffers[shard_id];
169
170        buffer.add_entry(entry)?;
171        self.stats.record_write_buffered();
172
173        // Check if we need to trigger flush for this specific shard
174        if buffer.is_full() && shard_id < self.worker_channels.len() {
175            let req = FlushRequest { response: None };
176            let _ = self.worker_channels[shard_id].try_send(req);
177        }
178
179        Ok(())
180    }
181
182    /// Start background worker threads
183    pub fn start_workers(&mut self, num_workers: usize) {
184        // Ensure we have the right number of workers for shards
185        let num_shards = self.sharded_buffers.len();
186        let actual_workers = num_workers.min(num_shards);
187
188        // Create per-worker channels
189        let mut receivers = Vec::new();
190        for _ in 0..actual_workers {
191            let (tx, rx) = bounded(2);
192            self.worker_channels.push(tx);
193            receivers.push(rx);
194        }
195
196        // Start workers, each owning one shard
197        for worker_id in 0..actual_workers {
198            let ctx = WorkerContext {
199                worker_id,
200                disk_io: self.disk_io.clone(),
201                free_space: self.free_space.clone(),
202                sharded_buffers: self.sharded_buffers.clone(),
203                shutdown: self.shutdown.clone(),
204                stats: self.stats.clone(),
205            };
206            let flush_rx = receivers.pop().unwrap();
207
208            let handle = thread::spawn(move || {
209                write_buffer_worker(ctx, flush_rx);
210            });
211
212            self.worker_handles.push(handle);
213        }
214
215        // Start periodic flush coordinator
216        let worker_channels = self.worker_channels.clone();
217        let shutdown = self.shutdown.clone();
218        let sharded_buffers = self.sharded_buffers.clone();
219
220        let periodic_handle = thread::spawn(move || {
221            let interval = WRITE_BUFFER_FLUSH_INTERVAL;
222
223            while !shutdown.load(Ordering::Acquire) {
224                thread::sleep(interval);
225
226                // Check each shard and trigger its worker if needed
227                for (shard_id, buffer) in sharded_buffers.iter().enumerate() {
228                    if buffer.count.load(Ordering::Relaxed) > 0 && shard_id < worker_channels.len()
229                    {
230                        let req = FlushRequest { response: None };
231                        let _ = worker_channels[shard_id].try_send(req);
232                    }
233                }
234            }
235        });
236
237        self.periodic_flush_handle = Some(periodic_handle);
238    }
239
240    /// Force flush and wait for completion
241    pub fn force_flush(&self) -> Result<()> {
242        let mut responses = Vec::new();
243
244        // Send flush request to each worker and collect response channels
245        for worker_tx in &self.worker_channels {
246            let (tx, rx) = bounded(1);
247            let req = FlushRequest { response: Some(tx) };
248
249            worker_tx.send(req).map_err(|_| FeoxError::ChannelError)?;
250            responses.push(rx);
251        }
252
253        // Wait for all workers to complete
254        for rx in responses {
255            rx.recv().map_err(|_| FeoxError::ChannelError)??;
256        }
257
258        Ok(())
259    }
260
261    /// Shutdown write buffer
262    pub fn initiate_shutdown(&self) {
263        self.shutdown.store(true, Ordering::Release);
264
265        // Don't call force_flush here as it can block
266        // Workers will see the shutdown flag and exit gracefully
267    }
268
269    /// Complete shutdown - must be called after initiate_shutdown
270    pub fn complete_shutdown(&mut self) {
271        use std::time::Duration;
272
273        // Ensure shutdown flag is set
274        self.shutdown.store(true, Ordering::Release);
275
276        // Wait for periodic flush thread to finish with timeout
277        if let Some(handle) = self.periodic_flush_handle.take() {
278            // Spawn a thread to wait with timeout since JoinHandle doesn't have join_timeout
279            let (tx, rx) = crossbeam_channel::bounded(1);
280            thread::spawn(move || {
281                let _ = handle.join();
282                let _ = tx.send(());
283            });
284
285            if rx.recv_timeout(Duration::from_secs(5)).is_err() {
286                // Timeout waiting for periodic flush thread
287            }
288        }
289
290        // Signal workers to stop and wait
291        for handle in self.worker_handles.drain(..) {
292            let _ = handle.join();
293        }
294
295        // Note: disk_io shutdown is handled by the Store's Drop implementation
296        // to ensure proper ordering
297    }
298
299    /// Legacy shutdown for compatibility
300    pub fn shutdown(&mut self) {
301        self.complete_shutdown();
302    }
303
304    #[inline]
305    fn get_shard_id(&self) -> usize {
306        thread_local! {
307            static THREAD_SHARD_ID: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
308        }
309
310        THREAD_SHARD_ID.with(|id| {
311            if let Some(cpu_id) = id.get() {
312                cpu_id
313            } else {
314                // Assign a shard based on thread hash for consistency
315                use std::collections::hash_map::RandomState;
316                use std::hash::BuildHasher;
317                let shard_id = RandomState::new().hash_one(std::thread::current().id()) as usize
318                    % self.sharded_buffers.len();
319                id.set(Some(shard_id));
320                shard_id
321            }
322        })
323    }
324}
325
326/// Background worker for processing write buffer flushes
327fn write_buffer_worker(ctx: WorkerContext, flush_rx: Receiver<FlushRequest>) {
328    let worker_id = ctx.worker_id;
329
330    loop {
331        if ctx.shutdown.load(Ordering::Acquire) {
332            break;
333        }
334
335        // Wait for flush request with timeout to check shutdown periodically
336        let req = match flush_rx.recv_timeout(Duration::from_millis(500)) {
337            Ok(req) => req,
338            Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
339                continue;
340            }
341            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
342                break;
343            }
344        };
345
346        // Drain only this worker's shard
347        if worker_id < ctx.sharded_buffers.len() {
348            let buffer = &ctx.sharded_buffers[worker_id];
349            let entries = buffer.drain_entries();
350
351            if !entries.is_empty() {
352                // Process writes for this shard
353                let result =
354                    process_write_batch(&ctx.disk_io, &ctx.free_space, entries, &ctx.stats);
355
356                ctx.stats.flush_count.fetch_add(1, Ordering::Relaxed);
357
358                // Send response if requested
359                if let Some(tx) = req.response {
360                    let _ = tx.send(result);
361                }
362            } else if let Some(tx) = req.response {
363                // No data to flush, but still respond if needed
364                let _ = tx.send(Ok(()));
365            }
366        }
367    }
368
369    // Before exiting, flush any remaining data from this worker's shard
370    if ctx.shutdown.load(Ordering::Acquire) && worker_id < ctx.sharded_buffers.len() {
371        let buffer = &ctx.sharded_buffers[worker_id];
372        let final_entries = buffer.drain_entries();
373
374        if !final_entries.is_empty() {
375            let _ = process_write_batch(&ctx.disk_io, &ctx.free_space, final_entries, &ctx.stats);
376        }
377    }
378}
379
380/// Process a batch of write entries
381fn process_write_batch(
382    disk_io: &Arc<RwLock<DiskIO>>,
383    free_space: &Arc<RwLock<FreeSpaceManager>>,
384    entries: Vec<WriteEntry>,
385    stats: &Arc<Statistics>,
386) -> Result<()> {
387    let mut batch_writes = Vec::new();
388    let mut delete_operations = Vec::new();
389    let mut records_to_clear = Vec::new();
390
391    // Prepare all operations
392    for entry in entries {
393        match entry.op {
394            Operation::Insert | Operation::Update => {
395                // Check if record is still valid (not deleted)
396                if entry.record.refcount.load(Ordering::Acquire) > 0
397                    && entry.record.sector.load(Ordering::Acquire) == 0
398                {
399                    let data = prepare_record_data(&entry.record)?;
400                    let sectors_needed = data.len().div_ceil(FEOX_BLOCK_SIZE);
401                    let sector = free_space.write().allocate_sectors(sectors_needed as u64)?;
402
403                    // Track disk usage
404                    stats
405                        .disk_usage
406                        .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
407
408                    batch_writes.push((sector, data));
409                    records_to_clear.push((sector, entry.record.clone()));
410                }
411            }
412            Operation::Delete => {
413                let sector = entry.record.sector.load(Ordering::Acquire);
414                if sector != 0 {
415                    delete_operations.push((sector, entry.record.key.len(), entry.old_value_len));
416                }
417            }
418            _ => {}
419        }
420    }
421
422    // Process deletes first
423    for (sector, key_len, value_len) in delete_operations {
424        // Write deletion marker
425        let mut deletion_marker = vec![0u8; FEOX_BLOCK_SIZE];
426        deletion_marker[..8].copy_from_slice(b"\0DELETED");
427
428        let _ = disk_io.write().write_sectors_sync(sector, &deletion_marker);
429
430        // Calculate sectors used and release them
431        let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len;
432        let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
433
434        let _ = free_space
435            .write()
436            .release_sectors(sector, sectors_needed as u64);
437
438        // Update disk usage
439        stats
440            .disk_usage
441            .fetch_sub((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
442    }
443
444    // Process batch writes with io_uring
445    if !batch_writes.is_empty() {
446        // Use io_uring with retry
447        let mut retries = 3;
448        let mut delay_us = 100;
449
450        while retries > 0 {
451            // Execute batch write
452            let result = disk_io.write().batch_write(batch_writes.clone());
453
454            match result {
455                Ok(()) => {
456                    for (sector, record) in &records_to_clear {
457                        record.sector.store(*sector, Ordering::Release);
458                        std::sync::atomic::fence(Ordering::Release);
459                        record.clear_value();
460                    }
461                    stats.record_write_flushed(records_to_clear.len() as u64);
462                    break;
463                }
464                Err(e) => {
465                    retries -= 1;
466                    if retries > 0 {
467                        // Exponential backoff with jitter ±10%
468                        let jitter = {
469                            use rand::Rng;
470                            let mut rng = rand::rng();
471                            (delay_us * rng.random_range(-10..=10)) / 100
472                        };
473                        let actual_delay = (delay_us + jitter).max(1);
474                        thread::sleep(Duration::from_micros(actual_delay as u64));
475                        delay_us *= 2;
476                    } else {
477                        stats.record_write_failed();
478                        for (sector, _) in &records_to_clear {
479                            free_space.write().release_sectors(*sector, 1)?;
480                        }
481                        return Err(e);
482                    }
483                }
484            }
485        }
486    }
487
488    Ok(())
489}
490
491fn prepare_record_data(record: &Record) -> Result<Vec<u8>> {
492    let total_size = SECTOR_HEADER_SIZE + 2 + record.key.len() + 8 + 8 + record.value_len;
493
494    // Calculate padded size to sector boundary
495    let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
496    let padded_size = sectors_needed * FEOX_BLOCK_SIZE;
497
498    let mut data = Vec::with_capacity(padded_size);
499
500    // Sector header
501    data.extend_from_slice(&SECTOR_MARKER.to_le_bytes());
502    data.extend_from_slice(&0u16.to_le_bytes()); // seq_number
503
504    // Record data
505    data.extend_from_slice(&(record.key.len() as u16).to_le_bytes());
506    data.extend_from_slice(&record.key);
507    data.extend_from_slice(&(record.value_len as u64).to_le_bytes());
508    data.extend_from_slice(&record.timestamp.to_le_bytes());
509
510    if let Some(value) = record.get_value() {
511        data.extend_from_slice(value.as_ref());
512    }
513
514    // Pad to sector boundary
515    data.resize(padded_size, 0);
516
517    Ok(data)
518}
519
520impl Drop for WriteBuffer {
521    fn drop(&mut self) {
522        if !self.shutdown.load(Ordering::Acquire) {
523            self.complete_shutdown();
524        }
525    }
526}