feoxdb/storage/
io.rs

1#[cfg(target_os = "linux")]
2use io_uring::{opcode, types, IoUring, Probe};
3use std::fs::File;
4#[cfg(unix)]
5use std::io;
6#[cfg(unix)]
7use std::os::unix::io::RawFd;
8use std::sync::Arc;
9
10use crate::constants::*;
11use crate::error::{FeoxError, Result};
12use crate::utils::allocator::AlignedBuffer;
13
14pub struct DiskIO {
15    #[cfg(target_os = "linux")]
16    ring: Option<IoUring>,
17    _file: Arc<File>,
18    #[cfg(unix)]
19    fd: RawFd,
20    _use_direct_io: bool,
21}
22
23impl DiskIO {
24    #[cfg(unix)]
25    pub fn new(file: Arc<File>, use_direct_io: bool) -> Result<Self> {
26        use std::os::unix::io::AsRawFd;
27        let fd = file.as_raw_fd();
28        #[cfg(target_os = "linux")]
29        {
30            // Create io_uring instance
31            let ring: Option<IoUring> = IoUring::builder()
32                .setup_sqpoll(IOURING_SQPOLL_IDLE_MS)
33                .build(IOURING_QUEUE_SIZE)
34                .ok();
35
36            if let Some(ref r) = ring {
37                let mut probe = Probe::new();
38                if r.submitter().register_probe(&mut probe).is_ok()
39                    && probe.is_supported(opcode::Read::CODE)
40                    && probe.is_supported(opcode::Write::CODE)
41                {
42                    return Ok(Self {
43                        ring,
44                        _file: file.clone(),
45                        fd,
46                        _use_direct_io: use_direct_io,
47                    });
48                }
49            }
50
51            Ok(Self {
52                ring,
53                _file: file,
54                fd,
55                _use_direct_io: false, // io_uring not available, can't use O_DIRECT efficiently
56            })
57        }
58
59        #[cfg(not(target_os = "linux"))]
60        {
61            let _ = use_direct_io; // Suppress unused warning
62            Ok(Self {
63                _file: file,
64                fd,
65                _use_direct_io: false, // O_DIRECT not supported on this platform
66            })
67        }
68    }
69
70    #[cfg(not(unix))]
71    pub fn new_from_file(file: File) -> Result<Self> {
72        Ok(Self {
73            _file: Arc::new(file),
74            _use_direct_io: false,
75        })
76    }
77
78    pub fn read_sectors_sync(&self, sector: u64, count: u64) -> Result<Vec<u8>> {
79        let size = (count * FEOX_BLOCK_SIZE as u64) as usize;
80        let offset = sector * FEOX_BLOCK_SIZE as u64;
81
82        #[cfg(unix)]
83        {
84            // Only use aligned buffer for O_DIRECT
85            if self._use_direct_io {
86                let mut buffer = AlignedBuffer::new(size)?;
87                buffer.set_len(size);
88
89                let read = unsafe {
90                    libc::pread(
91                        self.fd,
92                        buffer.as_mut_ptr() as *mut libc::c_void,
93                        size,
94                        offset as libc::off_t,
95                    )
96                };
97
98                if read < 0 {
99                    let err = io::Error::last_os_error();
100                    eprintln!("pread failed at offset {}: {}", offset, err);
101                    return Err(FeoxError::IoError(err));
102                }
103
104                if read as usize != size {
105                    return Err(FeoxError::IoError(io::Error::new(
106                        io::ErrorKind::UnexpectedEof,
107                        format!("Read {} bytes, expected {}", read, size),
108                    )));
109                }
110
111                // Return the buffer's data directly (avoids extra copy)
112                Ok(buffer.as_slice().to_vec())
113            } else {
114                // Non-O_DIRECT path: use regular Vec
115                let mut buffer = vec![0u8; size];
116
117                let read = unsafe {
118                    libc::pread(
119                        self.fd,
120                        buffer.as_mut_ptr() as *mut libc::c_void,
121                        size,
122                        offset as libc::off_t,
123                    )
124                };
125
126                if read < 0 {
127                    let err = io::Error::last_os_error();
128                    eprintln!("pread failed at offset {}: {}", offset, err);
129                    return Err(FeoxError::IoError(err));
130                }
131
132                if read as usize != size {
133                    return Err(FeoxError::IoError(io::Error::new(
134                        io::ErrorKind::UnexpectedEof,
135                        format!("Read {} bytes, expected {}", read, size),
136                    )));
137                }
138
139                buffer.truncate(read as usize);
140                Ok(buffer)
141            }
142        }
143
144        #[cfg(not(unix))]
145        {
146            // For non-Unix, no O_DIRECT, use regular Vec
147            let mut buffer = vec![0u8; size];
148
149            // For non-Unix, we need platform-specific implementations
150            #[cfg(target_os = "windows")]
151            {
152                use std::os::windows::fs::FileExt;
153                self._file
154                    .seek_read(&mut buffer, offset)
155                    .map_err(FeoxError::IoError)?;
156            }
157
158            #[cfg(not(any(unix, target_os = "windows")))]
159            {
160                // Fallback for other platforms using standard file operations
161                use std::io::{Read, Seek, SeekFrom};
162
163                // Clone the Arc<File> to get a mutable handle for seeking
164                let mut file = self
165                    ._file
166                    .as_ref()
167                    .try_clone()
168                    .map_err(FeoxError::IoError)?;
169
170                file.seek(SeekFrom::Start(offset))
171                    .map_err(FeoxError::IoError)?;
172
173                file.read_exact(&mut buffer).map_err(FeoxError::IoError)?;
174            }
175
176            Ok(buffer)
177        }
178    }
179
180    pub fn write_sectors_sync(&self, sector: u64, data: &[u8]) -> Result<()> {
181        let offset = sector * FEOX_BLOCK_SIZE as u64;
182
183        #[cfg(unix)]
184        {
185            let written = if self._use_direct_io {
186                // O_DIRECT path: need aligned buffer
187                let mut aligned_buffer = AlignedBuffer::new(data.len())?;
188                aligned_buffer.set_len(data.len());
189                aligned_buffer.as_mut_slice().copy_from_slice(data);
190
191                unsafe {
192                    libc::pwrite(
193                        self.fd,
194                        aligned_buffer.as_ptr() as *const libc::c_void,
195                        aligned_buffer.len(),
196                        offset as libc::off_t,
197                    )
198                }
199            } else {
200                // Non-O_DIRECT path: write directly from input buffer
201                unsafe {
202                    libc::pwrite(
203                        self.fd,
204                        data.as_ptr() as *const libc::c_void,
205                        data.len(),
206                        offset as libc::off_t,
207                    )
208                }
209            };
210
211            if written < 0 {
212                return Err(FeoxError::IoError(io::Error::last_os_error()));
213            }
214
215            if written as usize != data.len() {
216                return Err(FeoxError::IoError(io::Error::new(
217                    io::ErrorKind::UnexpectedEof,
218                    "Partial write",
219                )));
220            }
221        }
222
223        #[cfg(not(unix))]
224        {
225            #[cfg(target_os = "windows")]
226            {
227                use std::os::windows::fs::FileExt;
228                self._file
229                    .seek_write(data, offset)
230                    .map_err(FeoxError::IoError)?;
231            }
232
233            #[cfg(not(any(unix, target_os = "windows")))]
234            {
235                // Fallback for other platforms using standard file operations
236                use std::io::{Seek, SeekFrom, Write};
237
238                // Clone the Arc<File> to get a mutable handle for seeking
239                let mut file = self
240                    ._file
241                    .as_ref()
242                    .try_clone()
243                    .map_err(FeoxError::IoError)?;
244
245                file.seek(SeekFrom::Start(offset))
246                    .map_err(FeoxError::IoError)?;
247
248                file.write_all(data).map_err(FeoxError::IoError)?;
249
250                // Ensure data is written to disk
251                file.sync_data().map_err(FeoxError::IoError)?;
252            }
253        }
254
255        Ok(())
256    }
257
258    pub fn flush(&self) -> Result<()> {
259        #[cfg(unix)]
260        unsafe {
261            if libc::fsync(self.fd) == -1 {
262                return Err(FeoxError::IoError(io::Error::last_os_error()));
263            }
264        }
265
266        #[cfg(not(unix))]
267        {
268            self._file.sync_all().map_err(FeoxError::IoError)?;
269        }
270
271        Ok(())
272    }
273
274    /// Shutdown io_uring to stop SQPOLL kernel thread
275    pub fn shutdown(&mut self) {
276        #[cfg(target_os = "linux")]
277        {
278            if let Some(ref mut ring) = self.ring {
279                // First, wait for any pending submissions to complete
280                // This ensures all in-flight I/O operations finish
281                if ring.submit_and_wait(0).is_ok() {
282                    // Now drain all completions to acknowledge them
283                    while ring.completion().next().is_some() {
284                        // Consume all completion events
285                    }
286                }
287            }
288            self.ring = None;
289        }
290    }
291
292    /// Batch write with io_uring for better throughput
293    /// Operations complete synchronously before returning
294    #[cfg(target_os = "linux")]
295    pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
296        if let Some(ref mut ring) = self.ring {
297            // Process in chunks to avoid overwhelming the submission queue
298            for chunk in writes.chunks(IOURING_MAX_BATCH) {
299                let mut aligned_buffers = Vec::new();
300
301                // Create aligned buffers for this chunk
302                for (_sector, data) in chunk {
303                    let mut aligned = AlignedBuffer::new(data.len())?;
304                    aligned.set_len(data.len());
305                    aligned.as_mut_slice().copy_from_slice(data);
306                    aligned_buffers.push(aligned);
307                }
308
309                // Submit operations for this chunk
310                unsafe {
311                    let mut sq = ring.submission();
312
313                    for (i, (sector, _)) in chunk.iter().enumerate() {
314                        let offset = sector * FEOX_BLOCK_SIZE as u64;
315                        let buffer = &aligned_buffers[i];
316
317                        let write_e = opcode::Write::new(
318                            types::Fd(self.fd),
319                            buffer.as_ptr(),
320                            buffer.len() as u32,
321                        )
322                        .offset(offset)
323                        .build()
324                        .user_data(i as u64);
325
326                        sq.push(&write_e)
327                            .map_err(|_| FeoxError::IoError(io::Error::other("SQ full")))?;
328                    }
329                }
330
331                // Submit and wait for this chunk to complete
332                let submitted = ring
333                    .submit_and_wait(chunk.len())
334                    .map_err(FeoxError::IoError)?;
335
336                // Process completions for this chunk
337                let mut completed = 0;
338                for cqe in ring.completion() {
339                    if cqe.result() < 0 {
340                        return Err(FeoxError::IoError(io::Error::from_raw_os_error(
341                            -cqe.result(),
342                        )));
343                    }
344                    completed += 1;
345                    if completed >= submitted {
346                        break;
347                    }
348                }
349            }
350
351            // Sync to ensure durability
352            self.flush()?;
353
354            Ok(())
355        } else {
356            // Fallback: do sync writes
357            for (sector, data) in writes {
358                self.write_sectors_sync(sector, &data)?;
359            }
360            Ok(())
361        }
362    }
363
364    pub fn read_metadata(&self) -> Result<Vec<u8>> {
365        self.read_sectors_sync(FEOX_METADATA_BLOCK, 1)
366    }
367
368    pub fn write_metadata(&self, metadata: &[u8]) -> Result<()> {
369        if metadata.len() > FEOX_BLOCK_SIZE {
370            return Err(FeoxError::InvalidValueSize);
371        }
372
373        // Prepare a full block (metadata may be smaller)
374        let mut block_data = vec![0u8; FEOX_BLOCK_SIZE];
375        block_data[..metadata.len()].copy_from_slice(metadata);
376
377        // write_sectors_sync will handle alignment if needed
378        self.write_sectors_sync(FEOX_METADATA_BLOCK, &block_data)?;
379        self.flush()
380    }
381
382    /// Non-Linux fallback implementation
383    #[cfg(not(target_os = "linux"))]
384    pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
385        // Fallback: do sync writes
386        for (sector, data) in writes {
387            self.write_sectors_sync(sector, &data)?;
388        }
389        Ok(())
390    }
391}