/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/storage/write_buffer.rs
Line | Count | Source |
1 | | use crossbeam_channel::{bounded, Receiver, Sender}; |
2 | | use crossbeam_utils::CachePadded; |
3 | | use parking_lot::{Mutex, RwLock}; |
4 | | use std::collections::VecDeque; |
5 | | use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; |
6 | | use std::sync::Arc; |
7 | | use std::thread::{self, JoinHandle}; |
8 | | use std::time::{Duration, Instant}; |
9 | | |
10 | | use crate::constants::*; |
11 | | use crate::core::record::Record; |
12 | | use crate::error::{FeoxError, Result}; |
13 | | use crate::stats::Statistics; |
14 | | use crate::storage::format::{get_format, RecordFormat}; |
15 | | use crate::storage::free_space::FreeSpaceManager; |
16 | | use crate::storage::io::DiskIO; |
17 | | |
18 | | /// Sharded write buffer for reducing contention |
19 | | /// Each thread consistently uses the same shard to improve cache locality |
20 | | #[repr(align(64))] // Cache line alignment |
21 | | pub struct ShardedWriteBuffer { |
22 | | /// Buffered writes pending flush |
23 | | buffer: Mutex<VecDeque<WriteEntry>>, |
24 | | |
25 | | /// Number of entries in buffer |
26 | | count: AtomicUsize, |
27 | | |
28 | | /// Total size of buffered data |
29 | | size: AtomicUsize, |
30 | | } |
31 | | |
32 | | /// Write entry for buffered operations |
33 | | pub struct WriteEntry { |
34 | | pub op: Operation, |
35 | | pub record: Arc<Record>, |
36 | | pub old_value_len: usize, |
37 | | pub work_status: AtomicU32, |
38 | | pub retry_count: AtomicU32, |
39 | | pub timestamp: Instant, |
40 | | } |
41 | | |
42 | | /// Main write buffer coordinator |
43 | | pub struct WriteBuffer { |
44 | | /// Sharded buffers to reduce contention between threads |
45 | | sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>, |
46 | | |
47 | | /// Shared disk I/O handle |
48 | | disk_io: Arc<RwLock<DiskIO>>, |
49 | | |
50 | | /// Free space manager for sector allocation |
51 | | free_space: Arc<RwLock<FreeSpaceManager>>, |
52 | | |
53 | | /// Per-worker channels for targeted flush requests |
54 | | worker_channels: Vec<Sender<FlushRequest>>, |
55 | | |
56 | | /// Background worker handles |
57 | | worker_handles: Vec<JoinHandle<()>>, |
58 | | |
59 | | /// Periodic flush thread handle |
60 | | periodic_flush_handle: Option<JoinHandle<()>>, |
61 | | |
62 | | /// Shutdown flag |
63 | | shutdown: Arc<AtomicBool>, |
64 | | |
65 | | /// Shared statistics |
66 | | stats: Arc<Statistics>, |
67 | | |
68 | | /// Format version for record serialization |
69 | | format_version: u32, |
70 | | } |
71 | | |
72 | | #[derive(Debug)] |
73 | | struct FlushRequest { |
74 | | response: Option<Sender<Result<()>>>, |
75 | | } |
76 | | |
77 | | struct WorkerContext { |
78 | | worker_id: usize, |
79 | | disk_io: Arc<RwLock<DiskIO>>, |
80 | | free_space: Arc<RwLock<FreeSpaceManager>>, |
81 | | sharded_buffers: Arc<Vec<CachePadded<ShardedWriteBuffer>>>, |
82 | | shutdown: Arc<AtomicBool>, |
83 | | stats: Arc<Statistics>, |
84 | | format_version: u32, |
85 | | } |
86 | | |
87 | | impl ShardedWriteBuffer { |
88 | 416 | fn new(_shard_id: usize) -> Self { |
89 | 416 | Self { |
90 | 416 | buffer: Mutex::new(VecDeque::new()), |
91 | 416 | count: AtomicUsize::new(0), |
92 | 416 | size: AtomicUsize::new(0), |
93 | 416 | } |
94 | 416 | } |
95 | | |
96 | 21.5k | fn add_entry(&self, entry: WriteEntry) -> Result<()> { |
97 | 21.5k | let entry_size = entry.record.calculate_size(); |
98 | | |
99 | 21.5k | let mut buffer = self.buffer.lock(); |
100 | 21.5k | buffer.push_back(entry); |
101 | | |
102 | 21.5k | self.count.fetch_add(1, Ordering::AcqRel); |
103 | 21.5k | self.size.fetch_add(entry_size, Ordering::AcqRel); |
104 | | |
105 | 21.5k | Ok(()) |
106 | 21.5k | } |
107 | | |
108 | 18.2k | fn drain_entries(&self) -> Vec<WriteEntry> { |
109 | 18.2k | let mut buffer = self.buffer.lock(); |
110 | 18.2k | let entries: Vec<_> = buffer.drain(..).collect(); |
111 | | |
112 | 18.2k | self.count.store(0, Ordering::Release); |
113 | 18.2k | self.size.store(0, Ordering::Release); |
114 | | |
115 | 18.2k | entries |
116 | 18.2k | } |
117 | | |
118 | 21.5k | fn is_full(&self) -> bool { |
119 | 21.5k | self.count.load(Ordering::Acquire) >= WRITE_BUFFER_SIZE |
120 | 3.57k | || self.size.load(Ordering::Acquire) >= FEOX_WRITE_BUFFER_SIZE |
121 | 21.5k | } |
122 | | } |
123 | | |
124 | | impl WriteBuffer { |
125 | 26 | pub fn new( |
126 | 26 | disk_io: Arc<RwLock<DiskIO>>, |
127 | 26 | free_space: Arc<RwLock<FreeSpaceManager>>, |
128 | 26 | stats: Arc<Statistics>, |
129 | 26 | format_version: u32, |
130 | 26 | ) -> Self { |
131 | | // Use half CPU count for both shards and workers |
132 | 26 | let num_shards = (num_cpus::get() / 2).max(1); |
133 | | |
134 | 26 | let sharded_buffers = Arc::new( |
135 | 26 | (0..num_shards) |
136 | 416 | .map26 (|shard_id| CachePadded::new(ShardedWriteBuffer::new(shard_id))) |
137 | 26 | .collect(), |
138 | | ); |
139 | | |
140 | 26 | Self { |
141 | 26 | sharded_buffers, |
142 | 26 | disk_io, |
143 | 26 | free_space, |
144 | 26 | worker_channels: Vec::new(), |
145 | 26 | worker_handles: Vec::new(), |
146 | 26 | periodic_flush_handle: None, |
147 | 26 | shutdown: Arc::new(AtomicBool::new(false)), |
148 | 26 | stats, |
149 | 26 | format_version, |
150 | 26 | } |
151 | 26 | } |
152 | | |
153 | | /// Add write operation to buffer (lock-free fast path) |
154 | 21.5k | pub fn add_write( |
155 | 21.5k | &self, |
156 | 21.5k | op: Operation, |
157 | 21.5k | record: Arc<Record>, |
158 | 21.5k | old_value_len: usize, |
159 | 21.5k | ) -> Result<()> { |
160 | 21.5k | if self.shutdown.load(Ordering::Acquire) { |
161 | 1 | return Err(FeoxError::ShuttingDown); |
162 | 21.5k | } |
163 | | |
164 | 21.5k | let entry = WriteEntry { |
165 | 21.5k | op, |
166 | 21.5k | record, |
167 | 21.5k | old_value_len, |
168 | 21.5k | work_status: AtomicU32::new(0), |
169 | 21.5k | retry_count: AtomicU32::new(0), |
170 | 21.5k | timestamp: Instant::now(), |
171 | 21.5k | }; |
172 | | |
173 | | // Get thread's consistent shard |
174 | 21.5k | let shard_id = self.get_shard_id(); |
175 | 21.5k | let buffer = &self.sharded_buffers[shard_id]; |
176 | | |
177 | 21.5k | buffer.add_entry(entry)?0 ; |
178 | 21.5k | self.stats.record_write_buffered(); |
179 | | |
180 | | // Check if we need to trigger flush for this specific shard |
181 | 21.5k | if buffer.is_full() && shard_id17.9k < self.worker_channels.len() { |
182 | 17.9k | let req = FlushRequest { response: None }; |
183 | 17.9k | let _ = self.worker_channels[shard_id].try_send(req); |
184 | 17.9k | }3.57k |
185 | | |
186 | 21.5k | Ok(()) |
187 | 21.5k | } |
188 | | |
189 | | /// Start background worker threads |
190 | 22 | pub fn start_workers(&mut self, num_workers: usize) { |
191 | | // Ensure we have the right number of workers for shards |
192 | 22 | let num_shards = self.sharded_buffers.len(); |
193 | 22 | let actual_workers = num_workers.min(num_shards); |
194 | | |
195 | | // Create per-worker channels |
196 | 22 | let mut receivers = Vec::new(); |
197 | 340 | for _ in 0..actual_workers22 { |
198 | 340 | let (tx, rx) = bounded(2); |
199 | 340 | self.worker_channels.push(tx); |
200 | 340 | receivers.push(rx); |
201 | 340 | } |
202 | | |
203 | | // Start workers, each owning one shard |
204 | 340 | for worker_id in 0..actual_workers22 { |
205 | 340 | let ctx = WorkerContext { |
206 | 340 | worker_id, |
207 | 340 | disk_io: self.disk_io.clone(), |
208 | 340 | free_space: self.free_space.clone(), |
209 | 340 | sharded_buffers: self.sharded_buffers.clone(), |
210 | 340 | shutdown: self.shutdown.clone(), |
211 | 340 | stats: self.stats.clone(), |
212 | 340 | format_version: self.format_version, |
213 | 340 | }; |
214 | | |
215 | | // Note: pop() gives us receivers in reverse order, so worker N gets receiver 0 |
216 | 340 | let flush_rx = receivers.pop().unwrap(); |
217 | | |
218 | 340 | let handle = thread::spawn(move || { |
219 | 340 | write_buffer_worker(ctx, flush_rx); |
220 | 340 | }); |
221 | | |
222 | 340 | self.worker_handles.push(handle); |
223 | | } |
224 | | |
225 | | // Start periodic flush coordinator |
226 | 22 | let worker_channels = self.worker_channels.clone(); |
227 | 22 | let shutdown = self.shutdown.clone(); |
228 | 22 | let sharded_buffers = self.sharded_buffers.clone(); |
229 | | |
230 | 22 | let periodic_handle = thread::spawn(move || { |
231 | 22 | let interval = WRITE_BUFFER_FLUSH_INTERVAL; |
232 | | |
233 | 42 | while !shutdown.load(Ordering::Acquire) { |
234 | 20 | thread::sleep(interval); |
235 | | |
236 | | // Check each shard and trigger its worker if needed |
237 | 320 | for (shard_id, buffer) in sharded_buffers.iter()20 .enumerate20 () { |
238 | 320 | let count = buffer.count.load(Ordering::Relaxed); |
239 | 320 | if count > 0 && shard_id13 < worker_channels.len() { |
240 | 8 | let req = FlushRequest { response: None }; |
241 | 8 | // Workers were assigned channels in reverse order due to pop() |
242 | 8 | let channel_idx = worker_channels.len() - 1 - shard_id; |
243 | 8 | let _ = worker_channels[channel_idx].try_send(req); |
244 | 312 | } |
245 | | } |
246 | | } |
247 | 22 | }); |
248 | | |
249 | 22 | self.periodic_flush_handle = Some(periodic_handle); |
250 | 22 | } |
251 | | |
252 | | /// Force flush and wait for completion |
253 | 11 | pub fn force_flush(&self) -> Result<()> { |
254 | 11 | let mut responses = Vec::new(); |
255 | | |
256 | | // Send flush request to each worker and collect response channels |
257 | 187 | for worker_tx176 in &self.worker_channels { |
258 | 176 | let (tx, rx) = bounded(1); |
259 | 176 | let req = FlushRequest { response: Some(tx) }; |
260 | | |
261 | 176 | worker_tx.send(req).map_err(|_| FeoxError::ChannelError)?0 ; |
262 | 176 | responses.push(rx); |
263 | | } |
264 | | |
265 | | // Wait for all workers to complete |
266 | 187 | for rx176 in responses { |
267 | 176 | rx.recv().map_err(|_| FeoxError::ChannelError)?0 ?0 ; |
268 | | } |
269 | | |
270 | 11 | Ok(()) |
271 | 11 | } |
272 | | |
273 | | /// Shutdown write buffer |
274 | 19 | pub fn initiate_shutdown(&self) { |
275 | 19 | self.shutdown.store(true, Ordering::Release); |
276 | | |
277 | | // Don't call force_flush here as it can block |
278 | | // Workers will see the shutdown flag and exit gracefully |
279 | 19 | } |
280 | | |
281 | | /// Complete shutdown - must be called after initiate_shutdown |
282 | 26 | pub fn complete_shutdown(&mut self) { |
283 | | use std::time::Duration; |
284 | | |
285 | | // Ensure shutdown flag is set |
286 | 26 | self.shutdown.store(true, Ordering::Release); |
287 | | |
288 | | // Wait for periodic flush thread to finish with timeout |
289 | 26 | if let Some(handle22 ) = self.periodic_flush_handle.take() { |
290 | | // Spawn a thread to wait with timeout since JoinHandle doesn't have join_timeout |
291 | 22 | let (tx, rx) = crossbeam_channel::bounded(1); |
292 | 22 | thread::spawn(move || { |
293 | 22 | let _ = handle.join(); |
294 | 22 | let _ = tx.send(()); |
295 | 22 | }); |
296 | | |
297 | 22 | if rx.recv_timeout(Duration::from_secs(5)).is_err() { |
298 | 0 | // Timeout waiting for periodic flush thread |
299 | 22 | } |
300 | 4 | } |
301 | | |
302 | | // Signal workers to stop and wait |
303 | 340 | for handle in self.worker_handles26 .drain26 (..26 ) { |
304 | 340 | let _ = handle.join(); |
305 | 340 | } |
306 | | |
307 | | // Note: disk_io shutdown is handled by the Store's Drop implementation |
308 | | // to ensure proper ordering |
309 | 26 | } |
310 | | |
311 | | /// Legacy shutdown for compatibility |
312 | 0 | pub fn shutdown(&mut self) { |
313 | 0 | self.complete_shutdown(); |
314 | 0 | } |
315 | | |
316 | | #[inline] |
317 | 21.5k | fn get_shard_id(&self) -> usize { |
318 | | thread_local! { |
319 | | static THREAD_SHARD_ID: std::cell::Cell<Option<usize>> = const { std::cell::Cell::new(None) }; |
320 | | } |
321 | | |
322 | 21.5k | THREAD_SHARD_ID.with(|id| { |
323 | 21.5k | if let Some(cpu_id21.5k ) = id.get() { |
324 | 21.5k | cpu_id |
325 | | } else { |
326 | | // Assign a shard based on thread hash for consistency |
327 | | use std::collections::hash_map::RandomState; |
328 | | use std::hash::BuildHasher; |
329 | 30 | let shard_id = RandomState::new().hash_one(std::thread::current().id()) as usize |
330 | 30 | % self.sharded_buffers.len(); |
331 | 30 | id.set(Some(shard_id)); |
332 | 30 | shard_id |
333 | | } |
334 | 21.5k | }) |
335 | 21.5k | } |
336 | | } |
337 | | |
338 | | /// Background worker for processing write buffer flushes |
339 | 340 | fn write_buffer_worker(ctx: WorkerContext, flush_rx: Receiver<FlushRequest>) { |
340 | 340 | let worker_id = ctx.worker_id; |
341 | 340 | let format = get_format(ctx.format_version); |
342 | | |
343 | | loop { |
344 | 18.5k | if ctx.shutdown.load(Ordering::Acquire) { |
345 | 340 | break; |
346 | 18.2k | } |
347 | | |
348 | | // Wait for flush request with timeout to check shutdown periodically |
349 | 18.2k | let req17.9k = match flush_rx.recv_timeout(Duration::from_millis(500)) { |
350 | 17.9k | Ok(req) => req, |
351 | | Err(crossbeam_channel::RecvTimeoutError::Timeout) => { |
352 | 284 | continue; |
353 | | } |
354 | | Err(crossbeam_channel::RecvTimeoutError::Disconnected) => { |
355 | 0 | break; |
356 | | } |
357 | | }; |
358 | | |
359 | | // Drain only this worker's shard |
360 | 17.9k | if worker_id < ctx.sharded_buffers.len() { |
361 | 17.9k | let buffer = &ctx.sharded_buffers[worker_id]; |
362 | 17.9k | let entries = buffer.drain_entries(); |
363 | | |
364 | 17.9k | if !entries.is_empty() { |
365 | | // Process writes for this shard |
366 | 18 | let result = process_write_batch( |
367 | 18 | &ctx.disk_io, |
368 | 18 | &ctx.free_space, |
369 | 18 | entries, |
370 | 18 | &ctx.stats, |
371 | 18 | format.as_ref(), |
372 | | ); |
373 | | |
374 | 18 | ctx.stats.flush_count.fetch_add(1, Ordering::Relaxed); |
375 | | |
376 | | // Send response if requested |
377 | 18 | if let Some(tx14 ) = req.response { |
378 | 14 | let _ = tx.send(result); |
379 | 14 | }4 |
380 | 17.9k | } else if let Some(tx162 ) = req.response { |
381 | 162 | // No data to flush, but still respond if needed |
382 | 162 | let _ = tx.send(Ok(())); |
383 | 17.7k | } |
384 | 0 | } |
385 | | } |
386 | | |
387 | | // Before exiting, flush any remaining data from this worker's shard |
388 | 340 | if ctx.shutdown.load(Ordering::Acquire) && worker_id < ctx.sharded_buffers.len() { |
389 | 340 | let buffer = &ctx.sharded_buffers[worker_id]; |
390 | 340 | let final_entries = buffer.drain_entries(); |
391 | | |
392 | 340 | if !final_entries.is_empty() { |
393 | 2 | let _ = process_write_batch( |
394 | 2 | &ctx.disk_io, |
395 | 2 | &ctx.free_space, |
396 | 2 | final_entries, |
397 | 2 | &ctx.stats, |
398 | 2 | format.as_ref(), |
399 | 2 | ); |
400 | 338 | } |
401 | 0 | } |
402 | 340 | } |
403 | | |
404 | | /// Process a batch of write entries |
405 | 20 | fn process_write_batch( |
406 | 20 | disk_io: &Arc<RwLock<DiskIO>>, |
407 | 20 | free_space: &Arc<RwLock<FreeSpaceManager>>, |
408 | 20 | entries: Vec<WriteEntry>, |
409 | 20 | stats: &Arc<Statistics>, |
410 | 20 | format: &dyn RecordFormat, |
411 | 20 | ) -> Result<()> { |
412 | 20 | let mut batch_writes = Vec::new(); |
413 | 20 | let mut delete_operations = Vec::new(); |
414 | 20 | let mut records_to_clear = Vec::new(); |
415 | | |
416 | | // Prepare all operations |
417 | 20.9k | for entry20.9k in entries { |
418 | 20.9k | match entry.op { |
419 | | Operation::Insert | Operation::Update => { |
420 | | // Check if record is still valid (not deleted) |
421 | 20.8k | if entry.record.refcount.load(Ordering::Acquire) > 0 |
422 | 20.8k | && entry.record.sector.load(Ordering::Acquire) == 0 |
423 | | { |
424 | 20.8k | let data = prepare_record_data(&entry.record, format)?0 ; |
425 | 20.8k | let sectors_needed = data.len().div_ceil(FEOX_BLOCK_SIZE); |
426 | 20.8k | let sector = free_space.write().allocate_sectors(sectors_needed as u64)?0 ; |
427 | | |
428 | | // Track disk usage |
429 | 20.8k | stats |
430 | 20.8k | .disk_usage |
431 | 20.8k | .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed); |
432 | | |
433 | 20.8k | batch_writes.push((sector, data)); |
434 | 20.8k | records_to_clear.push((sector, entry.record.clone())); |
435 | 2 | } |
436 | | } |
437 | | Operation::Delete => { |
438 | 106 | let sector = entry.record.sector.load(Ordering::Acquire); |
439 | 106 | if sector != 0 { |
440 | 1 | delete_operations.push((sector, entry.record.key.len(), entry.old_value_len)); |
441 | 105 | } |
442 | | } |
443 | 0 | _ => {} |
444 | | } |
445 | | } |
446 | | |
447 | | // Process deletes first |
448 | 21 | for (sector1 , key_len1 , value_len1 ) in delete_operations { |
449 | 1 | // Write deletion marker |
450 | 1 | let mut deletion_marker = vec![0u8; FEOX_BLOCK_SIZE]; |
451 | 1 | deletion_marker[..8].copy_from_slice(b"\0DELETED"); |
452 | 1 | |
453 | 1 | let _ = disk_io.write().write_sectors_sync(sector, &deletion_marker); |
454 | 1 | |
455 | 1 | // Calculate sectors used and release them |
456 | 1 | let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len; |
457 | 1 | let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE); |
458 | 1 | |
459 | 1 | let _ = free_space |
460 | 1 | .write() |
461 | 1 | .release_sectors(sector, sectors_needed as u64); |
462 | 1 | |
463 | 1 | // Update disk usage |
464 | 1 | stats |
465 | 1 | .disk_usage |
466 | 1 | .fetch_sub((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::Relaxed); |
467 | 1 | } |
468 | | |
469 | | // Process batch writes with io_uring |
470 | 20 | if !batch_writes.is_empty() { |
471 | | // Use io_uring with retry |
472 | 20 | let mut retries = 3; |
473 | 20 | let mut delay_us = 100; |
474 | | |
475 | 20 | while retries > 0 { |
476 | | // Execute batch write |
477 | 20 | let result = disk_io.write().batch_write(batch_writes.clone()); |
478 | | |
479 | 20 | match result { |
480 | | Ok(()) => { |
481 | 20.8k | for (sector20.8k , record20.8k ) in &records_to_clear { |
482 | 20.8k | record.sector.store(*sector, Ordering::Release); |
483 | 20.8k | std::sync::atomic::fence(Ordering::Release); |
484 | 20.8k | record.clear_value(); |
485 | 20.8k | } |
486 | 20 | stats.record_write_flushed(records_to_clear.len() as u64); |
487 | 20 | break; |
488 | | } |
489 | 0 | Err(e) => { |
490 | 0 | retries -= 1; |
491 | 0 | if retries > 0 { |
492 | | // Exponential backoff with jitter ±10% |
493 | 0 | let jitter = { |
494 | | use rand::Rng; |
495 | 0 | let mut rng = rand::rng(); |
496 | 0 | (delay_us * rng.random_range(-10..=10)) / 100 |
497 | | }; |
498 | 0 | let actual_delay = (delay_us + jitter).max(1); |
499 | 0 | thread::sleep(Duration::from_micros(actual_delay as u64)); |
500 | 0 | delay_us *= 2; |
501 | | } else { |
502 | 0 | stats.record_write_failed(); |
503 | 0 | for (sector, _) in &records_to_clear { |
504 | 0 | free_space.write().release_sectors(*sector, 1)?; |
505 | | } |
506 | 0 | return Err(e); |
507 | | } |
508 | | } |
509 | | } |
510 | | } |
511 | 0 | } |
512 | | |
513 | 20 | Ok(()) |
514 | 20 | } |
515 | | |
516 | 20.8k | fn prepare_record_data(record: &Record, format: &dyn RecordFormat) -> Result<Vec<u8>> { |
517 | | // Get the total size using the format trait |
518 | 20.8k | let total_size = format.total_size(record.key.len(), record.value_len); |
519 | | |
520 | | // Calculate padded size to sector boundary |
521 | 20.8k | let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE); |
522 | 20.8k | let padded_size = sectors_needed * FEOX_BLOCK_SIZE; |
523 | | |
524 | 20.8k | let mut data = Vec::with_capacity(padded_size); |
525 | | |
526 | | // Sector header |
527 | 20.8k | data.extend_from_slice(&SECTOR_MARKER.to_le_bytes()); |
528 | 20.8k | data.extend_from_slice(&0u16.to_le_bytes()); // seq_number |
529 | | |
530 | | // Use format trait to serialize the record |
531 | 20.8k | let record_data = format.serialize_record(record, true); |
532 | 20.8k | data.extend_from_slice(&record_data); |
533 | | |
534 | | // Pad to sector boundary |
535 | 20.8k | data.resize(padded_size, 0); |
536 | | |
537 | 20.8k | Ok(data) |
538 | 20.8k | } |
539 | | |
540 | | impl Drop for WriteBuffer { |
541 | 26 | fn drop(&mut self) { |
542 | 26 | if !self.shutdown.load(Ordering::Acquire) { |
543 | 5 | self.complete_shutdown(); |
544 | 21 | } |
545 | 26 | } |
546 | | } |