feoxdb/core/store/
persistence.rs1use std::io::{self, Read, Seek};
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use crate::constants::*;
6use crate::core::record::Record;
7use crate::error::{FeoxError, Result};
8use crate::storage::format::get_format;
9
10use super::FeoxStore;
11
12impl FeoxStore {
13 pub fn flush_all(&self) {
30 if !self.memory_only {
31 if let Some(ref wb) = self.write_buffer {
33 let _ = wb.force_flush();
34 }
35
36 if let Some(ref disk_io) = self.disk_io {
37 let mut metadata = self._metadata.write();
39 metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
40 metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
41 metadata.fragmentation = self.free_space.read().get_fragmentation();
42 metadata.update();
43
44 let _ = disk_io.write().write_metadata(metadata.as_bytes());
46 let _ = disk_io.write().flush();
47 }
48 }
49 }
50
51 pub(super) fn load_value_from_disk(&self, record: &Record) -> Result<Vec<u8>> {
52 let sector = record.sector.load(Ordering::Acquire);
53 if self.memory_only || sector == 0 {
54 return Err(FeoxError::InvalidRecord);
55 }
56
57 let metadata_version = self._metadata.read().version;
59 let format = get_format(metadata_version);
60
61 let total_size = format.total_size(record.key.len(), record.value_len);
63 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
64
65 let disk_io = self
67 .disk_io
68 .as_ref()
69 .ok_or_else(|| {
70 FeoxError::IoError(io::Error::new(
71 io::ErrorKind::NotFound,
72 "No disk IO available",
73 ))
74 })?
75 .read();
76
77 let data = disk_io.read_sectors_sync(sector, sectors_needed as u64)?;
78
79 let offset = format.value_offset(record.key.len());
81 if offset + record.value_len > data.len() {
82 return Err(FeoxError::InvalidRecord);
83 }
84
85 Ok(data[offset..offset + record.value_len].to_vec())
86 }
87
88 pub(super) fn open_device(
89 &mut self,
90 device_path: &Option<String>,
91 file_size: Option<u64>,
92 ) -> Result<()> {
93 if let Some(path) = device_path {
94 use std::fs::OpenOptions;
96 #[cfg(target_os = "linux")]
97 use std::os::unix::fs::OpenOptionsExt;
98
99 #[cfg(unix)]
100 let (file, use_direct_io) = if std::path::Path::new("/.dockerenv").exists() {
101 let file = OpenOptions::new()
102 .read(true)
103 .write(true)
104 .create(true)
105 .truncate(false)
106 .open(path)
107 .map_err(FeoxError::IoError)?;
108 (file, false) } else {
110 #[cfg(target_os = "linux")]
112 {
113 match OpenOptions::new()
115 .read(true)
116 .write(true)
117 .create(true)
118 .truncate(false)
119 .custom_flags(libc::O_DIRECT)
120 .open(path)
121 {
122 Ok(file) => (file, true), Err(_) => {
124 let file = OpenOptions::new()
126 .read(true)
127 .write(true)
128 .create(true)
129 .truncate(false)
130 .open(path)
131 .map_err(FeoxError::IoError)?;
132 (file, false)
133 }
134 }
135 }
136 #[cfg(not(target_os = "linux"))]
137 {
138 let file = OpenOptions::new()
139 .read(true)
140 .write(true)
141 .create(true)
142 .truncate(false)
143 .open(path)
144 .map_err(FeoxError::IoError)?;
145 (file, false) }
147 };
148
149 #[cfg(not(unix))]
150 let file = OpenOptions::new()
151 .read(true)
152 .write(true)
153 .create(true)
154 .truncate(false)
155 .open(path)
156 .map_err(FeoxError::IoError)?;
157
158 let metadata = file.metadata().map_err(FeoxError::IoError)?;
160 self.device_size = metadata.len();
161
162 let was_newly_created = self.device_size == 0;
164
165 if was_newly_created {
166 let target_size = file_size.unwrap_or(DEFAULT_DEVICE_SIZE);
168 file.set_len(target_size).map_err(FeoxError::IoError)?;
169 self.device_size = target_size;
170
171 self.free_space.write().initialize(self.device_size)?;
173
174 let mut metadata = self._metadata.write();
175 metadata.device_size = self.device_size;
176 metadata.update();
177 } else {
178 let is_empty_file = {
181 let mut temp_file = file.try_clone().map_err(FeoxError::IoError)?;
182 temp_file
183 .metadata()
184 .map(|m| {
185 if m.len() > 0 {
187 let mut buffer = vec![0u8; std::cmp::min(4096, m.len() as usize)];
188 temp_file.seek(std::io::SeekFrom::Start(0)).ok();
189 temp_file.read_exact(&mut buffer).ok();
190 buffer.iter().all(|&b| b == 0)
191 } else {
192 false
193 }
194 })
195 .unwrap_or(false)
196 };
197
198 if is_empty_file {
199 self.free_space.write().initialize(self.device_size)?;
201 } else {
202 self.free_space.write().set_device_size(self.device_size);
204 }
205 }
206
207 #[cfg(unix)]
208 {
209 use std::os::unix::io::AsRawFd;
210 let file_arc = Arc::new(file);
211 let fd = file_arc.as_raw_fd();
212 self.device_fd = Some(fd);
213 self.device_file = Some(file_arc.as_ref().try_clone().map_err(FeoxError::IoError)?);
215 let disk_io = crate::storage::io::DiskIO::new(file_arc, use_direct_io)?;
216 self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io)));
217 }
218
219 #[cfg(not(unix))]
220 {
221 self.device_file = Some(file.try_clone().map_err(FeoxError::IoError)?);
223 let disk_io = crate::storage::io::DiskIO::new_from_file(file)?;
224 self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io)));
225 }
226
227 let disk_io = self.disk_io.as_ref().unwrap().read();
228
229 if !was_newly_created {
231 if let Ok(metadata_bytes) = disk_io.read_metadata() {
232 if let Some(loaded_metadata) =
233 crate::storage::metadata::Metadata::from_bytes(&metadata_bytes)
234 {
235 self.stats
237 .disk_usage
238 .store(loaded_metadata.total_size, Ordering::Relaxed);
239 *self._metadata.write() = loaded_metadata;
240 }
241 }
242 }
243 }
244 Ok(())
245 }
246}
247
248impl Drop for FeoxStore {
249 fn drop(&mut self) {
250 if let Some(mut sweeper) = self.ttl_sweeper.write().take() {
252 sweeper.stop();
253 }
254
255 if let Some(ref wb) = self.write_buffer {
257 wb.initiate_shutdown();
258 }
259
260 if !self.memory_only {
262 if let Some(ref disk_io) = self.disk_io {
263 let mut metadata = self._metadata.write();
265 metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
266 metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
267 metadata.fragmentation = self.free_space.read().get_fragmentation();
268 metadata.update();
269
270 let _ = disk_io.write().write_metadata(metadata.as_bytes());
272 let _ = disk_io.write().flush();
273 }
274 }
275
276 if let Some(wb_arc) = self.write_buffer.take() {
278 if let Ok(wb) = Arc::try_unwrap(wb_arc) {
280 let mut wb_mut = wb;
282 wb_mut.complete_shutdown();
283 }
284 }
286
287 if let Some(ref disk_io) = self.disk_io {
289 disk_io.write().shutdown();
290 }
291 }
292}