1#[cfg(target_os = "linux")]
2use io_uring::{opcode, types, IoUring, Probe};
3use std::fs::File;
4#[cfg(unix)]
5use std::io;
6#[cfg(unix)]
7use std::os::unix::io::RawFd;
8use std::sync::Arc;
9
10use crate::constants::*;
11use crate::error::{FeoxError, Result};
12use crate::utils::allocator::AlignedBuffer;
13
14pub struct DiskIO {
15 #[cfg(target_os = "linux")]
16 ring: Option<IoUring>,
17 _file: Arc<File>,
18 #[cfg(unix)]
19 fd: RawFd,
20 _use_direct_io: bool,
21}
22
23impl DiskIO {
24 #[cfg(unix)]
25 pub fn new(file: Arc<File>, use_direct_io: bool) -> Result<Self> {
26 use std::os::unix::io::AsRawFd;
27 let fd = file.as_raw_fd();
28 #[cfg(target_os = "linux")]
29 {
30 let ring: Option<IoUring> = IoUring::builder()
32 .setup_sqpoll(IOURING_SQPOLL_IDLE_MS)
33 .build(IOURING_QUEUE_SIZE)
34 .ok();
35
36 if let Some(ref r) = ring {
37 let mut probe = Probe::new();
38 if r.submitter().register_probe(&mut probe).is_ok()
39 && probe.is_supported(opcode::Read::CODE)
40 && probe.is_supported(opcode::Write::CODE)
41 {
42 return Ok(Self {
43 ring,
44 _file: file.clone(),
45 fd,
46 _use_direct_io: use_direct_io,
47 });
48 }
49 }
50
51 Ok(Self {
52 ring,
53 _file: file,
54 fd,
55 _use_direct_io: false, })
57 }
58
59 #[cfg(not(target_os = "linux"))]
60 {
61 let _ = use_direct_io; Ok(Self {
63 _file: file,
64 fd,
65 _use_direct_io: false, })
67 }
68 }
69
70 #[cfg(not(unix))]
71 pub fn new_from_file(file: File) -> Result<Self> {
72 Ok(Self {
73 _file: Arc::new(file),
74 _use_direct_io: false,
75 })
76 }
77
78 pub fn read_sectors_sync(&self, sector: u64, count: u64) -> Result<Vec<u8>> {
79 let size = (count * FEOX_BLOCK_SIZE as u64) as usize;
80 let offset = sector * FEOX_BLOCK_SIZE as u64;
81
82 #[cfg(unix)]
83 {
84 if self._use_direct_io {
86 let mut buffer = AlignedBuffer::new(size)?;
87 buffer.set_len(size);
88
89 let read = unsafe {
90 libc::pread(
91 self.fd,
92 buffer.as_mut_ptr() as *mut libc::c_void,
93 size,
94 offset as libc::off_t,
95 )
96 };
97
98 if read < 0 {
99 let err = io::Error::last_os_error();
100 eprintln!("pread failed at offset {}: {}", offset, err);
101 return Err(FeoxError::IoError(err));
102 }
103
104 if read as usize != size {
105 return Err(FeoxError::IoError(io::Error::new(
106 io::ErrorKind::UnexpectedEof,
107 format!("Read {} bytes, expected {}", read, size),
108 )));
109 }
110
111 Ok(buffer.as_slice().to_vec())
113 } else {
114 let mut buffer = vec![0u8; size];
116
117 let read = unsafe {
118 libc::pread(
119 self.fd,
120 buffer.as_mut_ptr() as *mut libc::c_void,
121 size,
122 offset as libc::off_t,
123 )
124 };
125
126 if read < 0 {
127 let err = io::Error::last_os_error();
128 eprintln!("pread failed at offset {}: {}", offset, err);
129 return Err(FeoxError::IoError(err));
130 }
131
132 if read as usize != size {
133 return Err(FeoxError::IoError(io::Error::new(
134 io::ErrorKind::UnexpectedEof,
135 format!("Read {} bytes, expected {}", read, size),
136 )));
137 }
138
139 buffer.truncate(read as usize);
140 Ok(buffer)
141 }
142 }
143
144 #[cfg(not(unix))]
145 {
146 let mut buffer = vec![0u8; size];
148
149 #[cfg(target_os = "windows")]
151 {
152 use std::os::windows::fs::FileExt;
153 self._file
154 .seek_read(&mut buffer, offset)
155 .map_err(FeoxError::IoError)?;
156 }
157
158 #[cfg(not(any(unix, target_os = "windows")))]
159 {
160 use std::io::{Read, Seek, SeekFrom};
162
163 let mut file = self
165 ._file
166 .as_ref()
167 .try_clone()
168 .map_err(FeoxError::IoError)?;
169
170 file.seek(SeekFrom::Start(offset))
171 .map_err(FeoxError::IoError)?;
172
173 file.read_exact(&mut buffer).map_err(FeoxError::IoError)?;
174 }
175
176 Ok(buffer)
177 }
178 }
179
180 pub fn write_sectors_sync(&self, sector: u64, data: &[u8]) -> Result<()> {
181 let offset = sector * FEOX_BLOCK_SIZE as u64;
182
183 #[cfg(unix)]
184 {
185 let written = if self._use_direct_io {
186 let mut aligned_buffer = AlignedBuffer::new(data.len())?;
188 aligned_buffer.set_len(data.len());
189 aligned_buffer.as_mut_slice().copy_from_slice(data);
190
191 unsafe {
192 libc::pwrite(
193 self.fd,
194 aligned_buffer.as_ptr() as *const libc::c_void,
195 aligned_buffer.len(),
196 offset as libc::off_t,
197 )
198 }
199 } else {
200 unsafe {
202 libc::pwrite(
203 self.fd,
204 data.as_ptr() as *const libc::c_void,
205 data.len(),
206 offset as libc::off_t,
207 )
208 }
209 };
210
211 if written < 0 {
212 return Err(FeoxError::IoError(io::Error::last_os_error()));
213 }
214
215 if written as usize != data.len() {
216 return Err(FeoxError::IoError(io::Error::new(
217 io::ErrorKind::UnexpectedEof,
218 "Partial write",
219 )));
220 }
221 }
222
223 #[cfg(not(unix))]
224 {
225 #[cfg(target_os = "windows")]
226 {
227 use std::os::windows::fs::FileExt;
228 self._file
229 .seek_write(data, offset)
230 .map_err(FeoxError::IoError)?;
231 }
232
233 #[cfg(not(any(unix, target_os = "windows")))]
234 {
235 use std::io::{Seek, SeekFrom, Write};
237
238 let mut file = self
240 ._file
241 .as_ref()
242 .try_clone()
243 .map_err(FeoxError::IoError)?;
244
245 file.seek(SeekFrom::Start(offset))
246 .map_err(FeoxError::IoError)?;
247
248 file.write_all(data).map_err(FeoxError::IoError)?;
249
250 file.sync_data().map_err(FeoxError::IoError)?;
252 }
253 }
254
255 Ok(())
256 }
257
258 pub fn flush(&self) -> Result<()> {
259 #[cfg(unix)]
260 unsafe {
261 if libc::fsync(self.fd) == -1 {
262 return Err(FeoxError::IoError(io::Error::last_os_error()));
263 }
264 }
265
266 #[cfg(not(unix))]
267 {
268 self._file.sync_all().map_err(FeoxError::IoError)?;
269 }
270
271 Ok(())
272 }
273
274 pub fn shutdown(&mut self) {
276 #[cfg(target_os = "linux")]
277 {
278 if let Some(ref mut ring) = self.ring {
279 if ring.submit_and_wait(0).is_ok() {
282 while ring.completion().next().is_some() {
284 }
286 }
287 }
288 self.ring = None;
289 }
290 }
291
292 #[cfg(target_os = "linux")]
295 pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
296 if let Some(ref mut ring) = self.ring {
297 for chunk in writes.chunks(IOURING_MAX_BATCH) {
299 let mut aligned_buffers = Vec::new();
300
301 for (_sector, data) in chunk {
303 let mut aligned = AlignedBuffer::new(data.len())?;
304 aligned.set_len(data.len());
305 aligned.as_mut_slice().copy_from_slice(data);
306 aligned_buffers.push(aligned);
307 }
308
309 unsafe {
311 let mut sq = ring.submission();
312
313 for (i, (sector, _)) in chunk.iter().enumerate() {
314 let offset = sector * FEOX_BLOCK_SIZE as u64;
315 let buffer = &aligned_buffers[i];
316
317 let write_e = opcode::Write::new(
318 types::Fd(self.fd),
319 buffer.as_ptr(),
320 buffer.len() as u32,
321 )
322 .offset(offset)
323 .build()
324 .user_data(i as u64);
325
326 sq.push(&write_e)
327 .map_err(|_| FeoxError::IoError(io::Error::other("SQ full")))?;
328 }
329 }
330
331 let submitted = ring
333 .submit_and_wait(chunk.len())
334 .map_err(FeoxError::IoError)?;
335
336 let mut completed = 0;
338 for cqe in ring.completion() {
339 if cqe.result() < 0 {
340 return Err(FeoxError::IoError(io::Error::from_raw_os_error(
341 -cqe.result(),
342 )));
343 }
344 completed += 1;
345 if completed >= submitted {
346 break;
347 }
348 }
349 }
350
351 self.flush()?;
353
354 Ok(())
355 } else {
356 for (sector, data) in writes {
358 self.write_sectors_sync(sector, &data)?;
359 }
360 Ok(())
361 }
362 }
363
364 pub fn read_metadata(&self) -> Result<Vec<u8>> {
365 self.read_sectors_sync(FEOX_METADATA_BLOCK, 1)
366 }
367
368 pub fn write_metadata(&self, metadata: &[u8]) -> Result<()> {
369 if metadata.len() > FEOX_BLOCK_SIZE {
370 return Err(FeoxError::InvalidValueSize);
371 }
372
373 let mut block_data = vec![0u8; FEOX_BLOCK_SIZE];
375 block_data[..metadata.len()].copy_from_slice(metadata);
376
377 self.write_sectors_sync(FEOX_METADATA_BLOCK, &block_data)?;
379 self.flush()
380 }
381
382 #[cfg(not(target_os = "linux"))]
384 pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
385 for (sector, data) in writes {
387 self.write_sectors_sync(sector, &data)?;
388 }
389 Ok(())
390 }
391}