feoxdb/storage/
write_buffer.rs

1use crossbeam_channel::{bounded, Receiver, Sender};
2use crossbeam_utils::CachePadded;
3use parking_lot::{Mutex, RwLock};
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::thread::{self, JoinHandle};
8use std::time::{Duration, Instant};
9
10use crate::constants::*;
11use crate::core::record::Record;
12use crate::error::{FeoxError, Result};
13use crate::stats::Statistics;
14use crate::storage::format::{get_format, RecordFormat};
15use crate::storage::free_space::FreeSpaceManager;
16use crate::storage::io::DiskIO;
17
18/// Sharded write buffer for reducing contention
19/// Each thread consistently uses the same shard to improve cache locality
20#[repr(align(64))] // Cache line alignment
21pub struct ShardedWriteBuffer {
22    /// Buffered writes pending flush
23    buffer: Mutex<VecDeque<WriteEntry>>,
24
25    /// Number of entries in buffer
26    count: AtomicUsize,
27
28    /// Total size of buffered data
29    size: AtomicUsize,
30}
31
32/// Write entry for buffered operations
33pub struct WriteEntry {
34    pub op: Operation,
35    pub record: Arc<Record>,
36    pub old_value_len: usize,
37    pub work_status: AtomicU32,
38    pub retry_count: AtomicU32,
39    pub timestamp: Instant,
40}
41
42/// Main write buffer coordinator
43pub struct WriteBuffer {
44    /// Sharded buffers to reduce contention between threads
45    sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
46
47    /// Shared disk I/O handle
48    disk_io: Arc<RwLock<DiskIO>>,
49
50    /// Free space manager for sector allocation
51    free_space: Arc<RwLock<FreeSpaceManager>>,
52
53    /// Per-worker channels for targeted flush requests
54    worker_channels: Vec<Sender<FlushRequest>>,
55
56    /// Background worker handles
57    worker_handles: Vec<JoinHandle<()>>,
58
59    /// Periodic flush thread handle
60    periodic_flush_handle: Option<JoinHandle<()>>,
61
62    /// Shutdown flag
63    shutdown: Arc<AtomicBool>,
64
65    /// Shared statistics
66    stats: Arc<Statistics>,
67
68    /// Format version for record serialization
69    format_version: u32,
70}
71
72#[derive(Debug)]
73struct FlushRequest {
74    response: Option<Sender<Result<()>>>,
75}
76
77struct WorkerContext {
78    worker_id: usize,
79    disk_io: Arc<RwLock<DiskIO>>,
80    free_space: Arc<RwLock<FreeSpaceManager>>,
81    sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
82    shutdown: Arc<AtomicBool>,
83    stats: Arc<Statistics>,
84    format_version: u32,
85}
86
87impl ShardedWriteBuffer {
88    fn new(_shard_id: usize) -> Self {
89        Self {
90            buffer: Mutex::new(VecDeque::new()),
91            count: AtomicUsize::new(0),
92            size: AtomicUsize::new(0),
93        }
94    }
95
96    fn add_entry(&self, entry: WriteEntry) -> Result<()> {
97        let entry_size = entry.record.calculate_size();
98
99        let mut buffer = self.buffer.lock();
100        buffer.push_back(entry);
101
102        self.count.fetch_add(1, Ordering::AcqRel);
103        self.size.fetch_add(entry_size, Ordering::AcqRel);
104
105        Ok(())
106    }
107
108    fn drain_entries(&self) -> Vec<WriteEntry> {
109        let mut buffer = self.buffer.lock();
110        let entries: Vec<_> = buffer.drain(..).collect();
111
112        self.count.store(0, Ordering::Release);
113        self.size.store(0, Ordering::Release);
114
115        entries
116    }
117
118    fn is_full(&self) -> bool {
119        self.count.load(Ordering::Acquire) >= WRITE_BUFFER_SIZE
120            || self.size.load(Ordering::Acquire) >= FEOX_WRITE_BUFFER_SIZE
121    }
122}
123
124impl WriteBuffer {
125    pub fn new(
126        disk_io: Arc<RwLock<DiskIO>>,
127        free_space: Arc<RwLock<FreeSpaceManager>>,
128        stats: Arc<Statistics>,
129        format_version: u32,
130    ) -> Self {
131        // Use half CPU count for both shards and workers
132        let num_shards = (num_cpus::get() / 2).max(1);
133
134        let sharded_buffers = Arc::new(
135            (0..num_shards)
136                .map(|shard_id| CachePadded::new(ShardedWriteBuffer::new(shard_id)))
137                .collect(),
138        );
139
140        Self {
141            sharded_buffers,
142            disk_io,
143            free_space,
144            worker_channels: Vec::new(),
145            worker_handles: Vec::new(),
146            periodic_flush_handle: None,
147            shutdown: Arc::new(AtomicBool::new(false)),
148            stats,
149            format_version,
150        }
151    }
152
153    /// Add write operation to buffer (lock-free fast path)
154    pub fn add_write(
155        &self,
156        op: Operation,
157        record: Arc<Record>,
158        old_value_len: usize,
159    ) -> Result<()> {
160        if self.shutdown.load(Ordering::Acquire) {
161            return Err(FeoxError::ShuttingDown);
162        }
163
164        let entry = WriteEntry {
165            op,
166            record,
167            old_value_len,
168            work_status: AtomicU32::new(0),
169            retry_count: AtomicU32::new(0),
170            timestamp: Instant::now(),
171        };
172
173        // Get thread's consistent shard
174        let shard_id = self.get_shard_id();
175        let buffer = &self.sharded_buffers[shard_id];
176
177        buffer.add_entry(entry)?;
178        self.stats.record_write_buffered();
179
180        // Check if we need to trigger flush for this specific shard
181        if buffer.is_full() && shard_id < self.worker_channels.len() {
182            let req = FlushRequest { response: None };
183            let _ = self.worker_channels[shard_id].try_send(req);
184        }
185
186        Ok(())
187    }
188
189    /// Start background worker threads
190    pub fn start_workers(&mut self, num_workers: usize) {
191        // Ensure we have the right number of workers for shards
192        let num_shards = self.sharded_buffers.len();
193        let actual_workers = num_workers.min(num_shards);
194
195        // Create per-worker channels
196        let mut receivers = Vec::new();
197        for _ in 0..actual_workers {
198            let (tx, rx) = bounded(2);
199            self.worker_channels.push(tx);
200            receivers.push(rx);
201        }
202
203        // Start workers, each owning one shard
204        for worker_id in 0..actual_workers {
205            let ctx = WorkerContext {
206                worker_id,
207                disk_io: self.disk_io.clone(),
208                free_space: self.free_space.clone(),
209                sharded_buffers: self.sharded_buffers.clone(),
210                shutdown: self.shutdown.clone(),
211                stats: self.stats.clone(),
212                format_version: self.format_version,
213            };
214
215            // Note: pop() gives us receivers in reverse order, so worker N gets receiver 0
216            let flush_rx = receivers.pop().unwrap();
217
218            let handle = thread::spawn(move || {
219                write_buffer_worker(ctx, flush_rx);
220            });
221
222            self.worker_handles.push(handle);
223        }
224
225        // Start periodic flush coordinator
226        let worker_channels = self.worker_channels.clone();
227        let shutdown = self.shutdown.clone();
228        let sharded_buffers = self.sharded_buffers.clone();
229
230        let periodic_handle = thread::spawn(move || {
231            let interval = WRITE_BUFFER_FLUSH_INTERVAL;
232
233            while !shutdown.load(Ordering::Acquire) {
234                thread::sleep(interval);
235
236                // Check each shard and trigger its worker if needed
237                for (shard_id, buffer) in sharded_buffers.iter().enumerate() {
238                    let count = buffer.count.load(Ordering::Relaxed);
239                    if count > 0 && shard_id < worker_channels.len() {
240                        let req = FlushRequest { response: None };
241                        // Workers were assigned channels in reverse order due to pop()
242                        let channel_idx = worker_channels.len() - 1 - shard_id;
243                        let _ = worker_channels[channel_idx].try_send(req);
244                    }
245                }
246            }
247        });
248
249        self.periodic_flush_handle = Some(periodic_handle);
250    }
251
252    /// Force flush and wait for completion
253    pub fn force_flush(&self) -> Result<()> {
254        let mut responses = Vec::new();
255
256        // Send flush request to each worker and collect response channels
257        for worker_tx in &self.worker_channels {
258            let (tx, rx) = bounded(1);
259            let req = FlushRequest { response: Some(tx) };
260
261            worker_tx.send(req).map_err(|_| FeoxError::ChannelError)?;
262            responses.push(rx);
263        }
264
265        // Wait for all workers to complete
266        for rx in responses {
267            rx.recv().map_err(|_| FeoxError::ChannelError)??;
268        }
269
270        Ok(())
271    }
272
273    /// Shutdown write buffer
274    pub fn initiate_shutdown(&self) {
275        self.shutdown.store(true, Ordering::Release);
276
277        // Don't call force_flush here as it can block
278        // Workers will see the shutdown flag and exit gracefully
279    }
280
281    /// Complete shutdown - must be called after initiate_shutdown
282    pub fn complete_shutdown(&mut self) {
283        use std::time::Duration;
284
285        // Ensure shutdown flag is set
286        self.shutdown.store(true, Ordering::Release);
287
288        // Wait for periodic flush thread to finish with timeout
289        if let Some(handle) = self.periodic_flush_handle.take() {
290            // Spawn a thread to wait with timeout since JoinHandle doesn't have join_timeout
291            let (tx, rx) = crossbeam_channel::bounded(1);
292            thread::spawn(move || {
293                let _ = handle.join();
294                let _ = tx.send(());
295            });
296
297            if rx.recv_timeout(Duration::from_secs(5)).is_err() {
298                // Timeout waiting for periodic flush thread
299            }
300        }
301
302        // Signal workers to stop and wait
303        for handle in self.worker_handles.drain(..) {
304            let _ = handle.join();
305        }
306
307        // Note: disk_io shutdown is handled by the Store's Drop implementation
308        // to ensure proper ordering
309    }
310
311    /// Legacy shutdown for compatibility
312    pub fn shutdown(&mut self) {
313        self.complete_shutdown();
314    }
315
316    #[inline]
317    fn get_shard_id(&self) -> usize {
318        thread_local! {
319            static THREAD_SHARD_ID: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
320        }
321
322        THREAD_SHARD_ID.with(|id| {
323            if let Some(cpu_id) = id.get() {
324                cpu_id
325            } else {
326                // Assign a shard based on thread hash for consistency
327                use std::collections::hash_map::RandomState;
328                use std::hash::BuildHasher;
329                let shard_id = RandomState::new().hash_one(std::thread::current().id()) as usize
330                    % self.sharded_buffers.len();
331                id.set(Some(shard_id));
332                shard_id
333            }
334        })
335    }
336}
337
338/// Background worker for processing write buffer flushes
339fn write_buffer_worker(ctx: WorkerContext, flush_rx: Receiver<FlushRequest>) {
340    let worker_id = ctx.worker_id;
341    let format = get_format(ctx.format_version);
342
343    loop {
344        if ctx.shutdown.load(Ordering::Acquire) {
345            break;
346        }
347
348        // Wait for flush request with timeout to check shutdown periodically
349        let req = match flush_rx.recv_timeout(Duration::from_millis(500)) {
350            Ok(req) => req,
351            Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
352                continue;
353            }
354            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
355                break;
356            }
357        };
358
359        // Drain only this worker's shard
360        if worker_id < ctx.sharded_buffers.len() {
361            let buffer = &ctx.sharded_buffers[worker_id];
362            let entries = buffer.drain_entries();
363
364            if !entries.is_empty() {
365                // Process writes for this shard
366                let result = process_write_batch(
367                    &ctx.disk_io,
368                    &ctx.free_space,
369                    entries,
370                    &ctx.stats,
371                    format.as_ref(),
372                );
373
374                ctx.stats.flush_count.fetch_add(1, Ordering::Relaxed);
375
376                // Send response if requested
377                if let Some(tx) = req.response {
378                    let _ = tx.send(result);
379                }
380            } else if let Some(tx) = req.response {
381                // No data to flush, but still respond if needed
382                let _ = tx.send(Ok(()));
383            }
384        }
385    }
386
387    // Before exiting, flush any remaining data from this worker's shard
388    if ctx.shutdown.load(Ordering::Acquire) && worker_id < ctx.sharded_buffers.len() {
389        let buffer = &ctx.sharded_buffers[worker_id];
390        let final_entries = buffer.drain_entries();
391
392        if !final_entries.is_empty() {
393            let _ = process_write_batch(
394                &ctx.disk_io,
395                &ctx.free_space,
396                final_entries,
397                &ctx.stats,
398                format.as_ref(),
399            );
400        }
401    }
402}
403
404/// Process a batch of write entries
405fn process_write_batch(
406    disk_io: &Arc<RwLock<DiskIO>>,
407    free_space: &Arc<RwLock<FreeSpaceManager>>,
408    entries: Vec<WriteEntry>,
409    stats: &Arc<Statistics>,
410    format: &dyn RecordFormat,
411) -> Result<()> {
412    let mut batch_writes = Vec::new();
413    let mut delete_operations = Vec::new();
414    let mut records_to_clear = Vec::new();
415
416    // Prepare all operations
417    for entry in entries {
418        match entry.op {
419            Operation::Insert | Operation::Update => {
420                // Check if record is still valid (not deleted)
421                if entry.record.refcount.load(Ordering::Acquire) > 0
422                    && entry.record.sector.load(Ordering::Acquire) == 0
423                {
424                    let data = prepare_record_data(&entry.record, format)?;
425                    let sectors_needed = data.len().div_ceil(FEOX_BLOCK_SIZE);
426                    let sector = free_space.write().allocate_sectors(sectors_needed as u64)?;
427
428                    // Track disk usage
429                    stats
430                        .disk_usage
431                        .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
432
433                    batch_writes.push((sector, data));
434                    records_to_clear.push((sector, entry.record.clone()));
435                }
436            }
437            Operation::Delete => {
438                let sector = entry.record.sector.load(Ordering::Acquire);
439                if sector != 0 {
440                    delete_operations.push((sector, entry.record.key.len(), entry.old_value_len));
441                }
442            }
443            _ => {}
444        }
445    }
446
447    // Process deletes first
448    for (sector, key_len, value_len) in delete_operations {
449        // Write deletion marker
450        let mut deletion_marker = vec![0u8; FEOX_BLOCK_SIZE];
451        deletion_marker[..8].copy_from_slice(b"\0DELETED");
452
453        let _ = disk_io.write().write_sectors_sync(sector, &deletion_marker);
454
455        // Calculate sectors used and release them
456        let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len;
457        let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
458
459        let _ = free_space
460            .write()
461            .release_sectors(sector, sectors_needed as u64);
462
463        // Update disk usage
464        stats
465            .disk_usage
466            .fetch_sub((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
467    }
468
469    // Process batch writes with io_uring
470    if !batch_writes.is_empty() {
471        // Use io_uring with retry
472        let mut retries = 3;
473        let mut delay_us = 100;
474
475        while retries > 0 {
476            // Execute batch write
477            let result = disk_io.write().batch_write(batch_writes.clone());
478
479            match result {
480                Ok(()) => {
481                    for (sector, record) in &records_to_clear {
482                        record.sector.store(*sector, Ordering::Release);
483                        std::sync::atomic::fence(Ordering::Release);
484                        record.clear_value();
485                    }
486                    stats.record_write_flushed(records_to_clear.len() as u64);
487                    break;
488                }
489                Err(e) => {
490                    retries -= 1;
491                    if retries > 0 {
492                        // Exponential backoff with jitter ±10%
493                        let jitter = {
494                            use rand::Rng;
495                            let mut rng = rand::rng();
496                            (delay_us * rng.random_range(-10..=10)) / 100
497                        };
498                        let actual_delay = (delay_us + jitter).max(1);
499                        thread::sleep(Duration::from_micros(actual_delay as u64));
500                        delay_us *= 2;
501                    } else {
502                        stats.record_write_failed();
503                        for (sector, _) in &records_to_clear {
504                            free_space.write().release_sectors(*sector, 1)?;
505                        }
506                        return Err(e);
507                    }
508                }
509            }
510        }
511    }
512
513    Ok(())
514}
515
516fn prepare_record_data(record: &Record, format: &dyn RecordFormat) -> Result<Vec<u8>> {
517    // Get the total size using the format trait
518    let total_size = format.total_size(record.key.len(), record.value_len);
519
520    // Calculate padded size to sector boundary
521    let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
522    let padded_size = sectors_needed * FEOX_BLOCK_SIZE;
523
524    let mut data = Vec::with_capacity(padded_size);
525
526    // Sector header
527    data.extend_from_slice(&SECTOR_MARKER.to_le_bytes());
528    data.extend_from_slice(&0u16.to_le_bytes()); // seq_number
529
530    // Use format trait to serialize the record
531    let record_data = format.serialize_record(record, true);
532    data.extend_from_slice(&record_data);
533
534    // Pad to sector boundary
535    data.resize(padded_size, 0);
536
537    Ok(data)
538}
539
540impl Drop for WriteBuffer {
541    fn drop(&mut self) {
542        if !self.shutdown.load(Ordering::Acquire) {
543            self.complete_shutdown();
544        }
545    }
546}