feoxdb/storage/
io.rs

1#[cfg(target_os = "linux")]
2use io_uring::{opcode, types, IoUring, Probe};
3use std::fs::File;
4#[cfg(unix)]
5use std::io;
6#[cfg(unix)]
7use std::os::unix::io::RawFd;
8use std::sync::Arc;
9
10use crate::constants::*;
11use crate::error::{FeoxError, Result};
12#[cfg(unix)]
13use crate::utils::allocator::AlignedBuffer;
14
15pub struct DiskIO {
16    #[cfg(target_os = "linux")]
17    ring: Option<IoUring>,
18    _file: Arc<File>,
19    #[cfg(unix)]
20    fd: RawFd,
21    _use_direct_io: bool,
22}
23
24impl DiskIO {
25    #[cfg(unix)]
26    pub fn new(file: Arc<File>, use_direct_io: bool) -> Result<Self> {
27        use std::os::unix::io::AsRawFd;
28        let fd = file.as_raw_fd();
29        #[cfg(target_os = "linux")]
30        {
31            // Create io_uring instance
32            let ring: Option<IoUring> = IoUring::builder()
33                .setup_sqpoll(IOURING_SQPOLL_IDLE_MS)
34                .build(IOURING_QUEUE_SIZE)
35                .ok();
36
37            if let Some(ref r) = ring {
38                let mut probe = Probe::new();
39                if r.submitter().register_probe(&mut probe).is_ok()
40                    && probe.is_supported(opcode::Read::CODE)
41                    && probe.is_supported(opcode::Write::CODE)
42                {
43                    return Ok(Self {
44                        ring,
45                        _file: file.clone(),
46                        fd,
47                        _use_direct_io: use_direct_io,
48                    });
49                }
50            }
51
52            Ok(Self {
53                ring,
54                _file: file,
55                fd,
56                _use_direct_io: false, // io_uring not available, can't use O_DIRECT efficiently
57            })
58        }
59
60        #[cfg(not(target_os = "linux"))]
61        {
62            let _ = use_direct_io; // Suppress unused warning
63            Ok(Self {
64                _file: file,
65                fd,
66                _use_direct_io: false, // O_DIRECT not supported on this platform
67            })
68        }
69    }
70
71    #[cfg(not(unix))]
72    pub fn new_from_file(file: File) -> Result<Self> {
73        Ok(Self {
74            _file: Arc::new(file),
75            _use_direct_io: false,
76        })
77    }
78
79    pub fn read_sectors_sync(&self, sector: u64, count: u64) -> Result<Vec<u8>> {
80        let size = (count * FEOX_BLOCK_SIZE as u64) as usize;
81        let offset = sector * FEOX_BLOCK_SIZE as u64;
82
83        #[cfg(unix)]
84        {
85            // Only use aligned buffer for O_DIRECT
86            if self._use_direct_io {
87                let mut buffer = AlignedBuffer::new(size)?;
88                buffer.set_len(size);
89
90                let read = unsafe {
91                    libc::pread(
92                        self.fd,
93                        buffer.as_mut_ptr() as *mut libc::c_void,
94                        size,
95                        offset as libc::off_t,
96                    )
97                };
98
99                if read < 0 {
100                    let err = io::Error::last_os_error();
101                    return Err(FeoxError::IoError(err));
102                }
103
104                if read as usize != size {
105                    return Err(FeoxError::IoError(io::Error::new(
106                        io::ErrorKind::UnexpectedEof,
107                        format!("Read {} bytes, expected {}", read, size),
108                    )));
109                }
110
111                // Return the buffer's data directly (avoids extra copy)
112                Ok(buffer.as_slice().to_vec())
113            } else {
114                // Non-O_DIRECT path: use regular Vec
115                let mut buffer = vec![0u8; size];
116
117                let read = unsafe {
118                    libc::pread(
119                        self.fd,
120                        buffer.as_mut_ptr() as *mut libc::c_void,
121                        size,
122                        offset as libc::off_t,
123                    )
124                };
125
126                if read < 0 {
127                    let err = io::Error::last_os_error();
128                    return Err(FeoxError::IoError(err));
129                }
130
131                if read as usize != size {
132                    return Err(FeoxError::IoError(io::Error::new(
133                        io::ErrorKind::UnexpectedEof,
134                        format!("Read {} bytes, expected {}", read, size),
135                    )));
136                }
137
138                buffer.truncate(read as usize);
139                Ok(buffer)
140            }
141        }
142
143        #[cfg(not(unix))]
144        {
145            // For non-Unix, no O_DIRECT, use regular Vec
146            let mut buffer = vec![0u8; size];
147
148            // For non-Unix, we need platform-specific implementations
149            #[cfg(target_os = "windows")]
150            {
151                use std::os::windows::fs::FileExt;
152                self._file
153                    .seek_read(&mut buffer, offset)
154                    .map_err(FeoxError::IoError)?;
155            }
156
157            #[cfg(not(any(unix, target_os = "windows")))]
158            {
159                // Fallback for other platforms using standard file operations
160                use std::io::{Read, Seek, SeekFrom};
161
162                // Clone the Arc<File> to get a mutable handle for seeking
163                let mut file = self
164                    ._file
165                    .as_ref()
166                    .try_clone()
167                    .map_err(FeoxError::IoError)?;
168
169                file.seek(SeekFrom::Start(offset))
170                    .map_err(FeoxError::IoError)?;
171
172                file.read_exact(&mut buffer).map_err(FeoxError::IoError)?;
173            }
174
175            Ok(buffer)
176        }
177    }
178
179    pub fn write_sectors_sync(&self, sector: u64, data: &[u8]) -> Result<()> {
180        let offset = sector * FEOX_BLOCK_SIZE as u64;
181
182        #[cfg(unix)]
183        {
184            let written = if self._use_direct_io {
185                // O_DIRECT path: need aligned buffer
186                let mut aligned_buffer = AlignedBuffer::new(data.len())?;
187                aligned_buffer.set_len(data.len());
188                aligned_buffer.as_mut_slice().copy_from_slice(data);
189
190                unsafe {
191                    libc::pwrite(
192                        self.fd,
193                        aligned_buffer.as_ptr() as *const libc::c_void,
194                        aligned_buffer.len(),
195                        offset as libc::off_t,
196                    )
197                }
198            } else {
199                // Non-O_DIRECT path: write directly from input buffer
200                unsafe {
201                    libc::pwrite(
202                        self.fd,
203                        data.as_ptr() as *const libc::c_void,
204                        data.len(),
205                        offset as libc::off_t,
206                    )
207                }
208            };
209
210            if written < 0 {
211                return Err(FeoxError::IoError(io::Error::last_os_error()));
212            }
213
214            if written as usize != data.len() {
215                return Err(FeoxError::IoError(io::Error::new(
216                    io::ErrorKind::UnexpectedEof,
217                    "Partial write",
218                )));
219            }
220        }
221
222        #[cfg(not(unix))]
223        {
224            #[cfg(target_os = "windows")]
225            {
226                use std::os::windows::fs::FileExt;
227                self._file
228                    .seek_write(data, offset)
229                    .map_err(FeoxError::IoError)?;
230            }
231
232            #[cfg(not(any(unix, target_os = "windows")))]
233            {
234                // Fallback for other platforms using standard file operations
235                use std::io::{Seek, SeekFrom, Write};
236
237                // Clone the Arc<File> to get a mutable handle for seeking
238                let mut file = self
239                    ._file
240                    .as_ref()
241                    .try_clone()
242                    .map_err(FeoxError::IoError)?;
243
244                file.seek(SeekFrom::Start(offset))
245                    .map_err(FeoxError::IoError)?;
246
247                file.write_all(data).map_err(FeoxError::IoError)?;
248
249                // Ensure data is written to disk
250                file.sync_data().map_err(FeoxError::IoError)?;
251            }
252        }
253
254        Ok(())
255    }
256
257    pub fn flush(&self) -> Result<()> {
258        #[cfg(unix)]
259        unsafe {
260            if libc::fsync(self.fd) == -1 {
261                return Err(FeoxError::IoError(io::Error::last_os_error()));
262            }
263        }
264
265        #[cfg(not(unix))]
266        {
267            self._file.sync_all().map_err(FeoxError::IoError)?;
268        }
269
270        Ok(())
271    }
272
273    /// Shutdown io_uring to stop SQPOLL kernel thread
274    pub fn shutdown(&mut self) {
275        #[cfg(target_os = "linux")]
276        {
277            if let Some(ref mut ring) = self.ring {
278                // First, wait for any pending submissions to complete
279                // This ensures all in-flight I/O operations finish
280                if ring.submit_and_wait(0).is_ok() {
281                    // Now drain all completions to acknowledge them
282                    while ring.completion().next().is_some() {
283                        // Consume all completion events
284                    }
285                }
286            }
287            self.ring = None;
288        }
289    }
290
291    /// Batch write with io_uring for better throughput
292    /// Operations complete synchronously before returning
293    #[cfg(target_os = "linux")]
294    pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
295        if let Some(ref mut ring) = self.ring {
296            // Process in chunks to avoid overwhelming the submission queue
297            for chunk in writes.chunks(IOURING_MAX_BATCH) {
298                let mut aligned_buffers = Vec::new();
299
300                // Create aligned buffers for this chunk
301                for (_sector, data) in chunk {
302                    let mut aligned = AlignedBuffer::new(data.len())?;
303                    aligned.set_len(data.len());
304                    aligned.as_mut_slice().copy_from_slice(data);
305                    aligned_buffers.push(aligned);
306                }
307
308                // Submit operations for this chunk
309                unsafe {
310                    let mut sq = ring.submission();
311
312                    for (i, (sector, _)) in chunk.iter().enumerate() {
313                        let offset = sector * FEOX_BLOCK_SIZE as u64;
314                        let buffer = &aligned_buffers[i];
315
316                        let write_e = opcode::Write::new(
317                            types::Fd(self.fd),
318                            buffer.as_ptr(),
319                            buffer.len() as u32,
320                        )
321                        .offset(offset)
322                        .build()
323                        .user_data(i as u64);
324
325                        sq.push(&write_e)
326                            .map_err(|_| FeoxError::IoError(io::Error::other("SQ full")))?;
327                    }
328                }
329
330                // Submit and wait for this chunk to complete
331                let submitted = ring
332                    .submit_and_wait(chunk.len())
333                    .map_err(FeoxError::IoError)?;
334
335                // Process completions for this chunk
336                let mut completed = 0;
337                for cqe in ring.completion() {
338                    if cqe.result() < 0 {
339                        return Err(FeoxError::IoError(io::Error::from_raw_os_error(
340                            -cqe.result(),
341                        )));
342                    }
343                    completed += 1;
344                    if completed >= submitted {
345                        break;
346                    }
347                }
348            }
349
350            // Sync to ensure durability
351            self.flush()?;
352
353            Ok(())
354        } else {
355            // Fallback: do sync writes
356            for (sector, data) in writes {
357                self.write_sectors_sync(sector, &data)?;
358            }
359            Ok(())
360        }
361    }
362
363    pub fn read_metadata(&self) -> Result<Vec<u8>> {
364        self.read_sectors_sync(FEOX_METADATA_BLOCK, 1)
365    }
366
367    pub fn write_metadata(&self, metadata: &[u8]) -> Result<()> {
368        if metadata.len() > FEOX_BLOCK_SIZE {
369            return Err(FeoxError::InvalidValueSize);
370        }
371
372        // Prepare a full block (metadata may be smaller)
373        let mut block_data = vec![0u8; FEOX_BLOCK_SIZE];
374        block_data[..metadata.len()].copy_from_slice(metadata);
375
376        // write_sectors_sync will handle alignment if needed
377        self.write_sectors_sync(FEOX_METADATA_BLOCK, &block_data)?;
378        self.flush()
379    }
380
381    /// Non-Linux fallback implementation
382    #[cfg(not(target_os = "linux"))]
383    pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
384        // Fallback: do sync writes
385        for (sector, data) in writes {
386            self.write_sectors_sync(sector, &data)?;
387        }
388        Ok(())
389    }
390}