1#[cfg(target_os = "linux")]
2use io_uring::{opcode, types, IoUring, Probe};
3use std::fs::File;
4#[cfg(unix)]
5use std::io;
6#[cfg(unix)]
7use std::os::unix::io::RawFd;
8use std::sync::Arc;
9
10use crate::constants::*;
11use crate::error::{FeoxError, Result};
12#[cfg(unix)]
13use crate::utils::allocator::AlignedBuffer;
14
15pub struct DiskIO {
16 #[cfg(target_os = "linux")]
17 ring: Option<IoUring>,
18 _file: Arc<File>,
19 #[cfg(unix)]
20 fd: RawFd,
21 _use_direct_io: bool,
22}
23
24impl DiskIO {
25 #[cfg(unix)]
26 pub fn new(file: Arc<File>, use_direct_io: bool) -> Result<Self> {
27 use std::os::unix::io::AsRawFd;
28 let fd = file.as_raw_fd();
29 #[cfg(target_os = "linux")]
30 {
31 let ring: Option<IoUring> = IoUring::builder()
33 .setup_sqpoll(IOURING_SQPOLL_IDLE_MS)
34 .build(IOURING_QUEUE_SIZE)
35 .ok();
36
37 if let Some(ref r) = ring {
38 let mut probe = Probe::new();
39 if r.submitter().register_probe(&mut probe).is_ok()
40 && probe.is_supported(opcode::Read::CODE)
41 && probe.is_supported(opcode::Write::CODE)
42 {
43 return Ok(Self {
44 ring,
45 _file: file.clone(),
46 fd,
47 _use_direct_io: use_direct_io,
48 });
49 }
50 }
51
52 Ok(Self {
53 ring,
54 _file: file,
55 fd,
56 _use_direct_io: false, })
58 }
59
60 #[cfg(not(target_os = "linux"))]
61 {
62 let _ = use_direct_io; Ok(Self {
64 _file: file,
65 fd,
66 _use_direct_io: false, })
68 }
69 }
70
71 #[cfg(not(unix))]
72 pub fn new_from_file(file: File) -> Result<Self> {
73 Ok(Self {
74 _file: Arc::new(file),
75 _use_direct_io: false,
76 })
77 }
78
79 pub fn read_sectors_sync(&self, sector: u64, count: u64) -> Result<Vec<u8>> {
80 let size = (count * FEOX_BLOCK_SIZE as u64) as usize;
81 let offset = sector * FEOX_BLOCK_SIZE as u64;
82
83 #[cfg(unix)]
84 {
85 if self._use_direct_io {
87 let mut buffer = AlignedBuffer::new(size)?;
88 buffer.set_len(size);
89
90 let read = unsafe {
91 libc::pread(
92 self.fd,
93 buffer.as_mut_ptr() as *mut libc::c_void,
94 size,
95 offset as libc::off_t,
96 )
97 };
98
99 if read < 0 {
100 let err = io::Error::last_os_error();
101 return Err(FeoxError::IoError(err));
102 }
103
104 if read as usize != size {
105 return Err(FeoxError::IoError(io::Error::new(
106 io::ErrorKind::UnexpectedEof,
107 format!("Read {} bytes, expected {}", read, size),
108 )));
109 }
110
111 Ok(buffer.as_slice().to_vec())
113 } else {
114 let mut buffer = vec![0u8; size];
116
117 let read = unsafe {
118 libc::pread(
119 self.fd,
120 buffer.as_mut_ptr() as *mut libc::c_void,
121 size,
122 offset as libc::off_t,
123 )
124 };
125
126 if read < 0 {
127 let err = io::Error::last_os_error();
128 return Err(FeoxError::IoError(err));
129 }
130
131 if read as usize != size {
132 return Err(FeoxError::IoError(io::Error::new(
133 io::ErrorKind::UnexpectedEof,
134 format!("Read {} bytes, expected {}", read, size),
135 )));
136 }
137
138 buffer.truncate(read as usize);
139 Ok(buffer)
140 }
141 }
142
143 #[cfg(not(unix))]
144 {
145 let mut buffer = vec![0u8; size];
147
148 #[cfg(target_os = "windows")]
150 {
151 use std::os::windows::fs::FileExt;
152 self._file
153 .seek_read(&mut buffer, offset)
154 .map_err(FeoxError::IoError)?;
155 }
156
157 #[cfg(not(any(unix, target_os = "windows")))]
158 {
159 use std::io::{Read, Seek, SeekFrom};
161
162 let mut file = self
164 ._file
165 .as_ref()
166 .try_clone()
167 .map_err(FeoxError::IoError)?;
168
169 file.seek(SeekFrom::Start(offset))
170 .map_err(FeoxError::IoError)?;
171
172 file.read_exact(&mut buffer).map_err(FeoxError::IoError)?;
173 }
174
175 Ok(buffer)
176 }
177 }
178
179 pub fn write_sectors_sync(&self, sector: u64, data: &[u8]) -> Result<()> {
180 let offset = sector * FEOX_BLOCK_SIZE as u64;
181
182 #[cfg(unix)]
183 {
184 let written = if self._use_direct_io {
185 let mut aligned_buffer = AlignedBuffer::new(data.len())?;
187 aligned_buffer.set_len(data.len());
188 aligned_buffer.as_mut_slice().copy_from_slice(data);
189
190 unsafe {
191 libc::pwrite(
192 self.fd,
193 aligned_buffer.as_ptr() as *const libc::c_void,
194 aligned_buffer.len(),
195 offset as libc::off_t,
196 )
197 }
198 } else {
199 unsafe {
201 libc::pwrite(
202 self.fd,
203 data.as_ptr() as *const libc::c_void,
204 data.len(),
205 offset as libc::off_t,
206 )
207 }
208 };
209
210 if written < 0 {
211 return Err(FeoxError::IoError(io::Error::last_os_error()));
212 }
213
214 if written as usize != data.len() {
215 return Err(FeoxError::IoError(io::Error::new(
216 io::ErrorKind::UnexpectedEof,
217 "Partial write",
218 )));
219 }
220 }
221
222 #[cfg(not(unix))]
223 {
224 #[cfg(target_os = "windows")]
225 {
226 use std::os::windows::fs::FileExt;
227 self._file
228 .seek_write(data, offset)
229 .map_err(FeoxError::IoError)?;
230 }
231
232 #[cfg(not(any(unix, target_os = "windows")))]
233 {
234 use std::io::{Seek, SeekFrom, Write};
236
237 let mut file = self
239 ._file
240 .as_ref()
241 .try_clone()
242 .map_err(FeoxError::IoError)?;
243
244 file.seek(SeekFrom::Start(offset))
245 .map_err(FeoxError::IoError)?;
246
247 file.write_all(data).map_err(FeoxError::IoError)?;
248
249 file.sync_data().map_err(FeoxError::IoError)?;
251 }
252 }
253
254 Ok(())
255 }
256
257 pub fn flush(&self) -> Result<()> {
258 #[cfg(unix)]
259 unsafe {
260 if libc::fsync(self.fd) == -1 {
261 return Err(FeoxError::IoError(io::Error::last_os_error()));
262 }
263 }
264
265 #[cfg(not(unix))]
266 {
267 self._file.sync_all().map_err(FeoxError::IoError)?;
268 }
269
270 Ok(())
271 }
272
273 pub fn shutdown(&mut self) {
275 #[cfg(target_os = "linux")]
276 {
277 if let Some(ref mut ring) = self.ring {
278 if ring.submit_and_wait(0).is_ok() {
281 while ring.completion().next().is_some() {
283 }
285 }
286 }
287 self.ring = None;
288 }
289 }
290
291 #[cfg(target_os = "linux")]
294 pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
295 if let Some(ref mut ring) = self.ring {
296 for chunk in writes.chunks(IOURING_MAX_BATCH) {
298 let mut aligned_buffers = Vec::new();
299
300 for (_sector, data) in chunk {
302 let mut aligned = AlignedBuffer::new(data.len())?;
303 aligned.set_len(data.len());
304 aligned.as_mut_slice().copy_from_slice(data);
305 aligned_buffers.push(aligned);
306 }
307
308 unsafe {
310 let mut sq = ring.submission();
311
312 for (i, (sector, _)) in chunk.iter().enumerate() {
313 let offset = sector * FEOX_BLOCK_SIZE as u64;
314 let buffer = &aligned_buffers[i];
315
316 let write_e = opcode::Write::new(
317 types::Fd(self.fd),
318 buffer.as_ptr(),
319 buffer.len() as u32,
320 )
321 .offset(offset)
322 .build()
323 .user_data(i as u64);
324
325 sq.push(&write_e)
326 .map_err(|_| FeoxError::IoError(io::Error::other("SQ full")))?;
327 }
328 }
329
330 let submitted = ring
332 .submit_and_wait(chunk.len())
333 .map_err(FeoxError::IoError)?;
334
335 let mut completed = 0;
337 for cqe in ring.completion() {
338 if cqe.result() < 0 {
339 return Err(FeoxError::IoError(io::Error::from_raw_os_error(
340 -cqe.result(),
341 )));
342 }
343 completed += 1;
344 if completed >= submitted {
345 break;
346 }
347 }
348 }
349
350 self.flush()?;
352
353 Ok(())
354 } else {
355 for (sector, data) in writes {
357 self.write_sectors_sync(sector, &data)?;
358 }
359 Ok(())
360 }
361 }
362
363 pub fn read_metadata(&self) -> Result<Vec<u8>> {
364 self.read_sectors_sync(FEOX_METADATA_BLOCK, 1)
365 }
366
367 pub fn write_metadata(&self, metadata: &[u8]) -> Result<()> {
368 if metadata.len() > FEOX_BLOCK_SIZE {
369 return Err(FeoxError::InvalidValueSize);
370 }
371
372 let mut block_data = vec![0u8; FEOX_BLOCK_SIZE];
374 block_data[..metadata.len()].copy_from_slice(metadata);
375
376 self.write_sectors_sync(FEOX_METADATA_BLOCK, &block_data)?;
378 self.flush()
379 }
380
381 #[cfg(not(target_os = "linux"))]
383 pub fn batch_write(&mut self, writes: Vec<(u64, Vec<u8>)>) -> Result<()> {
384 for (sector, data) in writes {
386 self.write_sectors_sync(sector, &data)?;
387 }
388 Ok(())
389 }
390}