Coverage Report

Created: 2025-09-19 09:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/storage/io.rs
Line
Count
Source
1
#[cfg(target_os = "linux")]
2
use io_uring::{opcode, types, IoUring, Probe};
3
use std::fs::File;
4
#[cfg(unix)]
5
use std::io;
6
#[cfg(unix)]
7
use std::os::unix::io::RawFd;
8
use std::sync::Arc;
9
10
use crate::constants::*;
11
use crate::error::{FeoxError, Result};
12
#[cfg(unix)]
13
use crate::utils::allocator::AlignedBuffer;
14
15
pub struct DiskIO {
16
    #[cfg(target_os = "linux")]
17
    ring: Option<IoUring>,
18
    _file: Arc<File>,
19
    #[cfg(unix)]
20
    fd: RawFd,
21
    _use_direct_io: bool,
22
}
23
24
impl DiskIO {
25
    #[cfg(unix)]
26
36
    pub fn new(file: Arc<File>, use_direct_io: bool) -> Result<Self> {
27
        use std::os::unix::io::AsRawFd;
28
36
        let fd = file.as_raw_fd();
29
        #[cfg(target_os = "linux")]
30
        {
31
            // Create io_uring instance
32
36
            let ring: Option<IoUring> = IoUring::builder()
33
36
                .setup_sqpoll(IOURING_SQPOLL_IDLE_MS)
34
36
                .build(IOURING_QUEUE_SIZE)
35
36
                .ok();
36
37
36
            if let Some(ref r) = ring {
38
36
                let mut probe = Probe::new();
39
36
                if r.submitter().register_probe(&mut probe).is_ok()
40
36
                    && probe.is_supported(opcode::Read::CODE)
41
36
                    && probe.is_supported(opcode::Write::CODE)
42
                {
43
36
                    return Ok(Self {
44
36
                        ring,
45
36
                        _file: file.clone(),
46
36
                        fd,
47
36
                        _use_direct_io: use_direct_io,
48
36
                    });
49
0
                }
50
0
            }
51
52
0
            Ok(Self {
53
0
                ring,
54
0
                _file: file,
55
0
                fd,
56
0
                _use_direct_io: false, // io_uring not available, can't use O_DIRECT efficiently
57
0
            })
58
        }
59
60
        #[cfg(not(target_os = "linux"))]
61
        {
62
            let _ = use_direct_io; // Suppress unused warning
63
            Ok(Self {
64
                _file: file,
65
                fd,
66
                _use_direct_io: false, // O_DIRECT not supported on this platform
67
            })
68
        }
69
36
    }
70
71
    #[cfg(not(unix))]
72
    pub fn new_from_file(file: File) -> Result<Self> {
73
        Ok(Self {
74
            _file: Arc::new(file),
75
            _use_direct_io: false,
76
        })
77
    }
78
79
2.09M
    pub fn read_sectors_sync(&self, sector: u64, count: u64) -> Result<Vec<u8>> {
80
2.09M
        let size = (count * FEOX_BLOCK_SIZE as u64) as usize;
81
2.09M
        let offset = sector * FEOX_BLOCK_SIZE as u64;
82
83
        #[cfg(unix)]
84
        {
85
            // Only use aligned buffer for O_DIRECT
86
2.09M
            if self._use_direct_io {
87
2.09M
                let mut buffer = AlignedBuffer::new(size)
?0
;
88
2.09M
                buffer.set_len(size);
89
90
2.09M
                let read = unsafe {
91
2.09M
                    libc::pread(
92
2.09M
                        self.fd,
93
2.09M
                        buffer.as_mut_ptr() as *mut libc::c_void,
94
2.09M
                        size,
95
2.09M
                        offset as libc::off_t,
96
                    )
97
                };
98
99
2.09M
                if read < 0 {
100
0
                    let err = io::Error::last_os_error();
101
0
                    return Err(FeoxError::IoError(err));
102
2.09M
                }
103
104
2.09M
                if read as usize != size {
105
0
                    return Err(FeoxError::IoError(io::Error::new(
106
0
                        io::ErrorKind::UnexpectedEof,
107
0
                        format!("Read {} bytes, expected {}", read, size),
108
0
                    )));
109
2.09M
                }
110
111
                // Return the buffer's data directly (avoids extra copy)
112
2.09M
                Ok(buffer.as_slice().to_vec())
113
            } else {
114
                // Non-O_DIRECT path: use regular Vec
115
25
                let mut buffer = vec![0u8; size];
116
117
25
                let read = unsafe {
118
25
                    libc::pread(
119
25
                        self.fd,
120
25
                        buffer.as_mut_ptr() as *mut libc::c_void,
121
25
                        size,
122
25
                        offset as libc::off_t,
123
                    )
124
                };
125
126
25
                if read < 0 {
127
0
                    let err = io::Error::last_os_error();
128
0
                    return Err(FeoxError::IoError(err));
129
25
                }
130
131
25
                if read as usize != size {
132
0
                    return Err(FeoxError::IoError(io::Error::new(
133
0
                        io::ErrorKind::UnexpectedEof,
134
0
                        format!("Read {} bytes, expected {}", read, size),
135
0
                    )));
136
25
                }
137
138
25
                buffer.truncate(read as usize);
139
25
                Ok(buffer)
140
            }
141
        }
142
143
        #[cfg(not(unix))]
144
        {
145
            // For non-Unix, no O_DIRECT, use regular Vec
146
            let mut buffer = vec![0u8; size];
147
148
            // For non-Unix, we need platform-specific implementations
149
            #[cfg(target_os = "windows")]
150
            {
151
                use std::os::windows::fs::FileExt;
152
                self._file
153
                    .seek_read(&mut buffer, offset)
154
                    .map_err(FeoxError::IoError)?;
155
            }
156
157
            #[cfg(not(any(unix, target_os = "windows")))]
158
            {
159
                // Fallback for other platforms using standard file operations
160
                use std::io::{Read, Seek, SeekFrom};
161
162
                // Clone the Arc<File> to get a mutable handle for seeking
163
                let mut file = self
164
                    ._file
165
                    .as_ref()
166
                    .try_clone()
167
                    .map_err(FeoxError::IoError)?;
168
169
                file.seek(SeekFrom::Start(offset))
170
                    .map_err(FeoxError::IoError)?;
171
172
                file.read_exact(&mut buffer).map_err(FeoxError::IoError)?;
173
            }
174
175
            Ok(buffer)
176
        }
177
2.09M
    }
178
179
44
    pub fn write_sectors_sync(&self, sector: u64, data: &[u8]) -> Result<()> {
180
44
        let offset = sector * FEOX_BLOCK_SIZE as u64;
181
182
        #[cfg(unix)]
183
        {
184
44
            let written = if self._use_direct_io {
185
                // O_DIRECT path: need aligned buffer
186
28
                let mut aligned_buffer = AlignedBuffer::new(data.len())
?0
;
187
28
                aligned_buffer.set_len(data.len());
188
28
                aligned_buffer.as_mut_slice().copy_from_slice(data);
189
190
                unsafe {
191
28
                    libc::pwrite(
192
28
                        self.fd,
193
28
                        aligned_buffer.as_ptr() as *const libc::c_void,
194
28
                        aligned_buffer.len(),
195
28
                        offset as libc::off_t,
196
                    )
197
                }
198
            } else {
199
                // Non-O_DIRECT path: write directly from input buffer
200
                unsafe {
201
16
                    libc::pwrite(
202
16
                        self.fd,
203
16
                        data.as_ptr() as *const libc::c_void,
204
16
                        data.len(),
205
16
                        offset as libc::off_t,
206
                    )
207
                }
208
            };
209
210
44
            if written < 0 {
211
0
                return Err(FeoxError::IoError(io::Error::last_os_error()));
212
44
            }
213
214
44
            if written as usize != data.len() {
215
0
                return Err(FeoxError::IoError(io::Error::new(
216
0
                    io::ErrorKind::UnexpectedEof,
217
0
                    "Partial write",
218
0
                )));
219
44
            }
220
        }
221
222
        #[cfg(not(unix))]
223
        {
224
            #[cfg(target_os = "windows")]
225
            {
226
                use std::os::windows::fs::FileExt;
227
                self._file
228
                    .seek_write(data, offset)
229
                    .map_err(FeoxError::IoError)?;
230
            }
231
232
            #[cfg(not(any(unix, target_os = "windows")))]
233
            {
234
                // Fallback for other platforms using standard file operations
235
                use std::io::{Seek, SeekFrom, Write};
236
237
                // Clone the Arc<File> to get a mutable handle for seeking
238
                let mut file = self
239
                    ._file
240
                    .as_ref()
241
                    .try_clone()
242
                    .map_err(FeoxError::IoError)?;
243
244
                file.seek(SeekFrom::Start(offset))
245
                    .map_err(FeoxError::IoError)?;
246
247
                file.write_all(data).map_err(FeoxError::IoError)?;
248
249
                // Ensure data is written to disk
250
                file.sync_data().map_err(FeoxError::IoError)?;
251
            }
252
        }
253
254
44
        Ok(())
255
44
    }
256
257
78
    pub fn flush(&self) -> Result<()> {
258
        #[cfg(unix)]
259
        unsafe {
260
78
            if libc::fsync(self.fd) == -1 {
261
0
                return Err(FeoxError::IoError(io::Error::last_os_error()));
262
78
            }
263
        }
264
265
        #[cfg(not(unix))]
266
        {
267
            self._file.sync_all().map_err(FeoxError::IoError)?;
268
        }
269
270
78
        Ok(())
271
78
    }
272
273
    /// Shutdown io_uring to stop SQPOLL kernel thread
274
19
    pub fn shutdown(&mut self) {
275
        #[cfg(target_os = "linux")]
276
        {
277
19
            if let Some(ref mut ring) = self.ring {
278
                // First, wait for any pending submissions to complete
279
                // This ensures all in-flight I/O operations finish
280
19
                if ring.submit_and_wait(0).is_ok() {
281
                    // Now drain all completions to acknowledge them
282
19
                    while ring.completion().next().is_some() {
283
0
                        // Consume all completion events
284
0
                    }
285
0
                }
286
0
            }
287
19
            self.ring = None;
288
        }
289
19
    }
290
291
    /// Batch write with io_uring for better throughput
292
    /// Operations complete synchronously before returning
293
    #[cfg(target_os = "linux")]
294
21
    pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
295
21
        if let Some(ref mut ring) = self.ring {
296
            // Process in chunks to avoid overwhelming the submission queue
297
179
            for chunk in 
writes21
.
chunks21
(IOURING_MAX_BATCH) {
298
179
                let mut aligned_buffers = Vec::new();
299
300
                // Create aligned buffers for this chunk
301
21.0k
                for (
_sector20.8k
,
data20.8k
) in chunk {
302
20.8k
                    let mut aligned = AlignedBuffer::new(data.len())
?0
;
303
20.8k
                    aligned.set_len(data.len());
304
20.8k
                    aligned.as_mut_slice().copy_from_slice(data);
305
20.8k
                    aligned_buffers.push(aligned);
306
                }
307
308
                // Submit operations for this chunk
309
                unsafe {
310
179
                    let mut sq = ring.submission();
311
312
20.8k
                    for (i, (sector, _)) in 
chunk179
.
iter179
().
enumerate179
() {
313
20.8k
                        let offset = sector * FEOX_BLOCK_SIZE as u64;
314
20.8k
                        let buffer = &aligned_buffers[i];
315
316
20.8k
                        let write_e = opcode::Write::new(
317
20.8k
                            types::Fd(self.fd),
318
20.8k
                            buffer.as_ptr(),
319
20.8k
                            buffer.len() as u32,
320
                        )
321
20.8k
                        .offset(offset)
322
20.8k
                        .build()
323
20.8k
                        .user_data(i as u64);
324
325
20.8k
                        sq.push(&write_e)
326
20.8k
                            .map_err(|_| FeoxError::IoError(
io::Error::other0
("SQ full")))
?0
;
327
                    }
328
                }
329
330
                // Submit and wait for this chunk to complete
331
179
                let submitted = ring
332
179
                    .submit_and_wait(chunk.len())
333
179
                    .map_err(FeoxError::IoError)
?0
;
334
335
                // Process completions for this chunk
336
179
                let mut completed = 0;
337
20.8k
                for cqe in 
ring179
.
completion179
() {
338
20.8k
                    if cqe.result() < 0 {
339
0
                        return Err(FeoxError::IoError(io::Error::from_raw_os_error(
340
0
                            -cqe.result(),
341
0
                        )));
342
20.8k
                    }
343
20.8k
                    completed += 1;
344
20.8k
                    if completed >= submitted {
345
179
                        break;
346
20.6k
                    }
347
                }
348
            }
349
350
            // Sync to ensure durability
351
21
            self.flush()
?0
;
352
353
21
            Ok(())
354
        } else {
355
            // Fallback: do sync writes
356
0
            for (sector, data) in writes {
357
0
                self.write_sectors_sync(sector, &data)?;
358
            }
359
0
            Ok(())
360
        }
361
21
    }
362
363
27
    pub fn read_metadata(&self) -> Result<Vec<u8>> {
364
27
        self.read_sectors_sync(FEOX_METADATA_BLOCK, 1)
365
27
    }
366
367
28
    pub fn write_metadata(&self, metadata: &[u8]) -> Result<()> {
368
28
        if metadata.len() > FEOX_BLOCK_SIZE {
369
0
            return Err(FeoxError::InvalidValueSize);
370
28
        }
371
372
        // Prepare a full block (metadata may be smaller)
373
28
        let mut block_data = vec![0u8; FEOX_BLOCK_SIZE];
374
28
        block_data[..metadata.len()].copy_from_slice(metadata);
375
376
        // write_sectors_sync will handle alignment if needed
377
28
        self.write_sectors_sync(FEOX_METADATA_BLOCK, &block_data)
?0
;
378
28
        self.flush()
379
28
    }
380
381
    /// Non-Linux fallback implementation
382
    #[cfg(not(target_os = "linux"))]
383
    pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
384
        // Fallback: do sync writes
385
        for (sector, data) in writes {
386
            self.write_sectors_sync(sector, &data)?;
387
        }
388
        Ok(())
389
    }
390
}