Coverage Report

Created: 2025-09-19 09:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/storage/write_buffer.rs
Line
Count
Source
1
use crossbeam_channel::{bounded, Receiver, Sender};
2
use crossbeam_utils::CachePadded;
3
use parking_lot::{Mutex, RwLock};
4
use std::collections::VecDeque;
5
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
6
use std::sync::Arc;
7
use std::thread::{self, JoinHandle};
8
use std::time::{Duration, Instant};
9
10
use crate::constants::*;
11
use crate::core::record::Record;
12
use crate::error::{FeoxError, Result};
13
use crate::stats::Statistics;
14
use crate::storage::format::{get_format, RecordFormat};
15
use crate::storage::free_space::FreeSpaceManager;
16
use crate::storage::io::DiskIO;
17
18
/// Sharded write buffer for reducing contention
19
/// Each thread consistently uses the same shard to improve cache locality
20
#[repr(align(64))] // Cache line alignment
21
pub struct ShardedWriteBuffer {
22
    /// Buffered writes pending flush
23
    buffer: Mutex<VecDeque<WriteEntry>>,
24
25
    /// Number of entries in buffer
26
    count: AtomicUsize,
27
28
    /// Total size of buffered data
29
    size: AtomicUsize,
30
}
31
32
/// Write entry for buffered operations
33
pub struct WriteEntry {
34
    pub op: Operation,
35
    pub record: Arc<Record>,
36
    pub old_value_len: usize,
37
    pub work_status: AtomicU32,
38
    pub retry_count: AtomicU32,
39
    pub timestamp: Instant,
40
}
41
42
/// Main write buffer coordinator
43
pub struct WriteBuffer {
44
    /// Sharded buffers to reduce contention between threads
45
    sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
46
47
    /// Shared disk I/O handle
48
    disk_io: Arc<RwLock<DiskIO>>,
49
50
    /// Free space manager for sector allocation
51
    free_space: Arc<RwLock<FreeSpaceManager>>,
52
53
    /// Per-worker channels for targeted flush requests
54
    worker_channels: Vec<Sender<FlushRequest>>,
55
56
    /// Background worker handles
57
    worker_handles: Vec<JoinHandle<()>>,
58
59
    /// Periodic flush thread handle
60
    periodic_flush_handle: Option<JoinHandle<()>>,
61
62
    /// Shutdown flag
63
    shutdown: Arc<AtomicBool>,
64
65
    /// Shared statistics
66
    stats: Arc<Statistics>,
67
68
    /// Format version for record serialization
69
    format_version: u32,
70
}
71
72
#[derive(Debug)]
73
struct FlushRequest {
74
    response: Option<Sender<Result<()>>>,
75
}
76
77
struct WorkerContext {
78
    worker_id: usize,
79
    disk_io: Arc<RwLock<DiskIO>>,
80
    free_space: Arc<RwLock<FreeSpaceManager>>,
81
    sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>,
82
    shutdown: Arc<AtomicBool>,
83
    stats: Arc<Statistics>,
84
    format_version: u32,
85
}
86
87
impl ShardedWriteBuffer {
88
416
    fn new(_shard_id: usize) -> Self {
89
416
        Self {
90
416
            buffer: Mutex::new(VecDeque::new()),
91
416
            count: AtomicUsize::new(0),
92
416
            size: AtomicUsize::new(0),
93
416
        }
94
416
    }
95
96
21.5k
    fn add_entry(&self, entry: WriteEntry) -> Result<()> {
97
21.5k
        let entry_size = entry.record.calculate_size();
98
99
21.5k
        let mut buffer = self.buffer.lock();
100
21.5k
        buffer.push_back(entry);
101
102
21.5k
        self.count.fetch_add(1, Ordering::AcqRel);
103
21.5k
        self.size.fetch_add(entry_size, Ordering::AcqRel);
104
105
21.5k
        Ok(())
106
21.5k
    }
107
108
18.2k
    fn drain_entries(&self) -> Vec<WriteEntry> {
109
18.2k
        let mut buffer = self.buffer.lock();
110
18.2k
        let entries: Vec<_> = buffer.drain(..).collect();
111
112
18.2k
        self.count.store(0, Ordering::Release);
113
18.2k
        self.size.store(0, Ordering::Release);
114
115
18.2k
        entries
116
18.2k
    }
117
118
21.5k
    fn is_full(&self) -> bool {
119
21.5k
        self.count.load(Ordering::Acquire) >= WRITE_BUFFER_SIZE
120
3.57k
            || self.size.load(Ordering::Acquire) >= FEOX_WRITE_BUFFER_SIZE
121
21.5k
    }
122
}
123
124
impl WriteBuffer {
125
26
    pub fn new(
126
26
        disk_io: Arc<RwLock<DiskIO>>,
127
26
        free_space: Arc<RwLock<FreeSpaceManager>>,
128
26
        stats: Arc<Statistics>,
129
26
        format_version: u32,
130
26
    ) -> Self {
131
        // Use half CPU count for both shards and workers
132
26
        let num_shards = (num_cpus::get() / 2).max(1);
133
134
26
        let sharded_buffers = Arc::new(
135
26
            (0..num_shards)
136
416
                .
map26
(|shard_id| CachePadded::new(ShardedWriteBuffer::new(shard_id)))
137
26
                .collect(),
138
        );
139
140
26
        Self {
141
26
            sharded_buffers,
142
26
            disk_io,
143
26
            free_space,
144
26
            worker_channels: Vec::new(),
145
26
            worker_handles: Vec::new(),
146
26
            periodic_flush_handle: None,
147
26
            shutdown: Arc::new(AtomicBool::new(false)),
148
26
            stats,
149
26
            format_version,
150
26
        }
151
26
    }
152
153
    /// Add write operation to buffer (lock-free fast path)
154
21.5k
    pub fn add_write(
155
21.5k
        &self,
156
21.5k
        op: Operation,
157
21.5k
        record: Arc<Record>,
158
21.5k
        old_value_len: usize,
159
21.5k
    ) -> Result<()> {
160
21.5k
        if self.shutdown.load(Ordering::Acquire) {
161
1
            return Err(FeoxError::ShuttingDown);
162
21.5k
        }
163
164
21.5k
        let entry = WriteEntry {
165
21.5k
            op,
166
21.5k
            record,
167
21.5k
            old_value_len,
168
21.5k
            work_status: AtomicU32::new(0),
169
21.5k
            retry_count: AtomicU32::new(0),
170
21.5k
            timestamp: Instant::now(),
171
21.5k
        };
172
173
        // Get thread's consistent shard
174
21.5k
        let shard_id = self.get_shard_id();
175
21.5k
        let buffer = &self.sharded_buffers[shard_id];
176
177
21.5k
        buffer.add_entry(entry)
?0
;
178
21.5k
        self.stats.record_write_buffered();
179
180
        // Check if we need to trigger flush for this specific shard
181
21.5k
        if buffer.is_full() && 
shard_id17.9k
< self.worker_channels.len() {
182
17.9k
            let req = FlushRequest { response: None };
183
17.9k
            let _ = self.worker_channels[shard_id].try_send(req);
184
17.9k
        
}3.57k
185
186
21.5k
        Ok(())
187
21.5k
    }
188
189
    /// Start background worker threads
190
22
    pub fn start_workers(&mut self, num_workers: usize) {
191
        // Ensure we have the right number of workers for shards
192
22
        let num_shards = self.sharded_buffers.len();
193
22
        let actual_workers = num_workers.min(num_shards);
194
195
        // Create per-worker channels
196
22
        let mut receivers = Vec::new();
197
340
        for _ in 0..
actual_workers22
{
198
340
            let (tx, rx) = bounded(2);
199
340
            self.worker_channels.push(tx);
200
340
            receivers.push(rx);
201
340
        }
202
203
        // Start workers, each owning one shard
204
340
        for worker_id in 0..
actual_workers22
{
205
340
            let ctx = WorkerContext {
206
340
                worker_id,
207
340
                disk_io: self.disk_io.clone(),
208
340
                free_space: self.free_space.clone(),
209
340
                sharded_buffers: self.sharded_buffers.clone(),
210
340
                shutdown: self.shutdown.clone(),
211
340
                stats: self.stats.clone(),
212
340
                format_version: self.format_version,
213
340
            };
214
215
            // Note: pop() gives us receivers in reverse order, so worker N gets receiver 0
216
340
            let flush_rx = receivers.pop().unwrap();
217
218
340
            let handle = thread::spawn(move || {
219
340
                write_buffer_worker(ctx, flush_rx);
220
340
            });
221
222
340
            self.worker_handles.push(handle);
223
        }
224
225
        // Start periodic flush coordinator
226
22
        let worker_channels = self.worker_channels.clone();
227
22
        let shutdown = self.shutdown.clone();
228
22
        let sharded_buffers = self.sharded_buffers.clone();
229
230
22
        let periodic_handle = thread::spawn(move || {
231
22
            let interval = WRITE_BUFFER_FLUSH_INTERVAL;
232
233
42
            while !shutdown.load(Ordering::Acquire) {
234
20
                thread::sleep(interval);
235
236
                // Check each shard and trigger its worker if needed
237
320
                for (shard_id, buffer) in 
sharded_buffers.iter()20
.
enumerate20
() {
238
320
                    let count = buffer.count.load(Ordering::Relaxed);
239
320
                    if count > 0 && 
shard_id13
< worker_channels.len() {
240
8
                        let req = FlushRequest { response: None };
241
8
                        // Workers were assigned channels in reverse order due to pop()
242
8
                        let channel_idx = worker_channels.len() - 1 - shard_id;
243
8
                        let _ = worker_channels[channel_idx].try_send(req);
244
312
                    }
245
                }
246
            }
247
22
        });
248
249
22
        self.periodic_flush_handle = Some(periodic_handle);
250
22
    }
251
252
    /// Force flush and wait for completion
253
11
    pub fn force_flush(&self) -> Result<()> {
254
11
        let mut responses = Vec::new();
255
256
        // Send flush request to each worker and collect response channels
257
187
        for 
worker_tx176
in &self.worker_channels {
258
176
            let (tx, rx) = bounded(1);
259
176
            let req = FlushRequest { response: Some(tx) };
260
261
176
            worker_tx.send(req).map_err(|_| FeoxError::ChannelError)
?0
;
262
176
            responses.push(rx);
263
        }
264
265
        // Wait for all workers to complete
266
187
        for 
rx176
in responses {
267
176
            rx.recv().map_err(|_| FeoxError::ChannelError)
?0
?0
;
268
        }
269
270
11
        Ok(())
271
11
    }
272
273
    /// Shutdown write buffer
274
19
    pub fn initiate_shutdown(&self) {
275
19
        self.shutdown.store(true, Ordering::Release);
276
277
        // Don't call force_flush here as it can block
278
        // Workers will see the shutdown flag and exit gracefully
279
19
    }
280
281
    /// Complete shutdown - must be called after initiate_shutdown
282
26
    pub fn complete_shutdown(&mut self) {
283
        use std::time::Duration;
284
285
        // Ensure shutdown flag is set
286
26
        self.shutdown.store(true, Ordering::Release);
287
288
        // Wait for periodic flush thread to finish with timeout
289
26
        if let Some(
handle22
) = self.periodic_flush_handle.take() {
290
            // Spawn a thread to wait with timeout since JoinHandle doesn't have join_timeout
291
22
            let (tx, rx) = crossbeam_channel::bounded(1);
292
22
            thread::spawn(move || {
293
22
                let _ = handle.join();
294
22
                let _ = tx.send(());
295
22
            });
296
297
22
            if rx.recv_timeout(Duration::from_secs(5)).is_err() {
298
0
                // Timeout waiting for periodic flush thread
299
22
            }
300
4
        }
301
302
        // Signal workers to stop and wait
303
340
        for handle in 
self.worker_handles26
.
drain26
(
..26
) {
304
340
            let _ = handle.join();
305
340
        }
306
307
        // Note: disk_io shutdown is handled by the Store's Drop implementation
308
        // to ensure proper ordering
309
26
    }
310
311
    /// Legacy shutdown for compatibility
312
0
    pub fn shutdown(&mut self) {
313
0
        self.complete_shutdown();
314
0
    }
315
316
    #[inline]
317
21.5k
    fn get_shard_id(&self) -> usize {
318
        thread_local! {
319
            static THREAD_SHARD_ID: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) };
320
        }
321
322
21.5k
        THREAD_SHARD_ID.with(|id| {
323
21.5k
            if let Some(
cpu_id21.5k
) = id.get() {
324
21.5k
                cpu_id
325
            } else {
326
                // Assign a shard based on thread hash for consistency
327
                use std::collections::hash_map::RandomState;
328
                use std::hash::BuildHasher;
329
30
                let shard_id = RandomState::new().hash_one(std::thread::current().id()) as usize
330
30
                    % self.sharded_buffers.len();
331
30
                id.set(Some(shard_id));
332
30
                shard_id
333
            }
334
21.5k
        })
335
21.5k
    }
336
}
337
338
/// Background worker for processing write buffer flushes
339
340
fn write_buffer_worker(ctx: WorkerContext, flush_rx: Receiver<FlushRequest>) {
340
340
    let worker_id = ctx.worker_id;
341
340
    let format = get_format(ctx.format_version);
342
343
    loop {
344
18.5k
        if ctx.shutdown.load(Ordering::Acquire) {
345
340
            break;
346
18.2k
        }
347
348
        // Wait for flush request with timeout to check shutdown periodically
349
18.2k
        let 
req17.9k
= match flush_rx.recv_timeout(Duration::from_millis(500)) {
350
17.9k
            Ok(req) => req,
351
            Err(crossbeam_channel::RecvTimeoutError::Timeout) => {
352
284
                continue;
353
            }
354
            Err(crossbeam_channel::RecvTimeoutError::Disconnected) => {
355
0
                break;
356
            }
357
        };
358
359
        // Drain only this worker's shard
360
17.9k
        if worker_id < ctx.sharded_buffers.len() {
361
17.9k
            let buffer = &ctx.sharded_buffers[worker_id];
362
17.9k
            let entries = buffer.drain_entries();
363
364
17.9k
            if !entries.is_empty() {
365
                // Process writes for this shard
366
18
                let result = process_write_batch(
367
18
                    &ctx.disk_io,
368
18
                    &ctx.free_space,
369
18
                    entries,
370
18
                    &ctx.stats,
371
18
                    format.as_ref(),
372
                );
373
374
18
                ctx.stats.flush_count.fetch_add(1, Ordering::Relaxed);
375
376
                // Send response if requested
377
18
                if let Some(
tx14
) = req.response {
378
14
                    let _ = tx.send(result);
379
14
                
}4
380
17.9k
            } else if let Some(
tx162
) = req.response {
381
162
                // No data to flush, but still respond if needed
382
162
                let _ = tx.send(Ok(()));
383
17.7k
            }
384
0
        }
385
    }
386
387
    // Before exiting, flush any remaining data from this worker's shard
388
340
    if ctx.shutdown.load(Ordering::Acquire) && worker_id < ctx.sharded_buffers.len() {
389
340
        let buffer = &ctx.sharded_buffers[worker_id];
390
340
        let final_entries = buffer.drain_entries();
391
392
340
        if !final_entries.is_empty() {
393
2
            let _ = process_write_batch(
394
2
                &ctx.disk_io,
395
2
                &ctx.free_space,
396
2
                final_entries,
397
2
                &ctx.stats,
398
2
                format.as_ref(),
399
2
            );
400
338
        }
401
0
    }
402
340
}
403
404
/// Process a batch of write entries
405
20
fn process_write_batch(
406
20
    disk_io: &Arc<RwLock<DiskIO>>,
407
20
    free_space: &Arc<RwLock<FreeSpaceManager>>,
408
20
    entries: Vec<WriteEntry>,
409
20
    stats: &Arc<Statistics>,
410
20
    format: &dyn RecordFormat,
411
20
) -> Result<()> {
412
20
    let mut batch_writes = Vec::new();
413
20
    let mut delete_operations = Vec::new();
414
20
    let mut records_to_clear = Vec::new();
415
416
    // Prepare all operations
417
20.9k
    for 
entry20.9k
in entries {
418
20.9k
        match entry.op {
419
            Operation::Insert | Operation::Update => {
420
                // Check if record is still valid (not deleted)
421
20.8k
                if entry.record.refcount.load(Ordering::Acquire) > 0
422
20.8k
                    && entry.record.sector.load(Ordering::Acquire) == 0
423
                {
424
20.8k
                    let data = prepare_record_data(&entry.record, format)
?0
;
425
20.8k
                    let sectors_needed = data.len().div_ceil(FEOX_BLOCK_SIZE);
426
20.8k
                    let sector = free_space.write().allocate_sectors(sectors_needed as u64)
?0
;
427
428
                    // Track disk usage
429
20.8k
                    stats
430
20.8k
                        .disk_usage
431
20.8k
                        .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
432
433
20.8k
                    batch_writes.push((sector, data));
434
20.8k
                    records_to_clear.push((sector, entry.record.clone()));
435
2
                }
436
            }
437
            Operation::Delete => {
438
106
                let sector = entry.record.sector.load(Ordering::Acquire);
439
106
                if sector != 0 {
440
1
                    delete_operations.push((sector, entry.record.key.len(), entry.old_value_len));
441
105
                }
442
            }
443
0
            _ => {}
444
        }
445
    }
446
447
    // Process deletes first
448
21
    for (
sector1
,
key_len1
,
value_len1
) in delete_operations {
449
1
        // Write deletion marker
450
1
        let mut deletion_marker = vec![0u8; FEOX_BLOCK_SIZE];
451
1
        deletion_marker[..8].copy_from_slice(b"\0DELETED");
452
1
453
1
        let _ = disk_io.write().write_sectors_sync(sector, &deletion_marker);
454
1
455
1
        // Calculate sectors used and release them
456
1
        let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len;
457
1
        let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
458
1
459
1
        let _ = free_space
460
1
            .write()
461
1
            .release_sectors(sector, sectors_needed as u64);
462
1
463
1
        // Update disk usage
464
1
        stats
465
1
            .disk_usage
466
1
            .fetch_sub((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed);
467
1
    }
468
469
    // Process batch writes with io_uring
470
20
    if !batch_writes.is_empty() {
471
        // Use io_uring with retry
472
20
        let mut retries = 3;
473
20
        let mut delay_us = 100;
474
475
20
        while retries > 0 {
476
            // Execute batch write
477
20
            let result = disk_io.write().batch_write(batch_writes.clone());
478
479
20
            match result {
480
                Ok(()) => {
481
20.8k
                    for (
sector20.8k
,
record20.8k
) in &records_to_clear {
482
20.8k
                        record.sector.store(*sector, Ordering::Release);
483
20.8k
                        std::sync::atomic::fence(Ordering::Release);
484
20.8k
                        record.clear_value();
485
20.8k
                    }
486
20
                    stats.record_write_flushed(records_to_clear.len() as u64);
487
20
                    break;
488
                }
489
0
                Err(e) => {
490
0
                    retries -= 1;
491
0
                    if retries > 0 {
492
                        // Exponential backoff with jitter ±10%
493
0
                        let jitter = {
494
                            use rand::Rng;
495
0
                            let mut rng = rand::rng();
496
0
                            (delay_us * rng.random_range(-10..=10)) / 100
497
                        };
498
0
                        let actual_delay = (delay_us + jitter).max(1);
499
0
                        thread::sleep(Duration::from_micros(actual_delay as u64));
500
0
                        delay_us *= 2;
501
                    } else {
502
0
                        stats.record_write_failed();
503
0
                        for (sector, _) in &records_to_clear {
504
0
                            free_space.write().release_sectors(*sector, 1)?;
505
                        }
506
0
                        return Err(e);
507
                    }
508
                }
509
            }
510
        }
511
0
    }
512
513
20
    Ok(())
514
20
}
515
516
20.8k
fn prepare_record_data(record: &Record, format: &dyn RecordFormat) -> Result<Vec<u8>> {
517
    // Get the total size using the format trait
518
20.8k
    let total_size = format.total_size(record.key.len(), record.value_len);
519
520
    // Calculate padded size to sector boundary
521
20.8k
    let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
522
20.8k
    let padded_size = sectors_needed * FEOX_BLOCK_SIZE;
523
524
20.8k
    let mut data = Vec::with_capacity(padded_size);
525
526
    // Sector header
527
20.8k
    data.extend_from_slice(&SECTOR_MARKER.to_le_bytes());
528
20.8k
    data.extend_from_slice(&0u16.to_le_bytes()); // seq_number
529
530
    // Use format trait to serialize the record
531
20.8k
    let record_data = format.serialize_record(record, true);
532
20.8k
    data.extend_from_slice(&record_data);
533
534
    // Pad to sector boundary
535
20.8k
    data.resize(padded_size, 0);
536
537
20.8k
    Ok(data)
538
20.8k
}
539
540
impl Drop for WriteBuffer {
541
26
    fn drop(&mut self) {
542
26
        if !self.shutdown.load(Ordering::Acquire) {
543
5
            self.complete_shutdown();
544
21
        }
545
26
    }
546
}