/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/core/store/persistence.rs
Line | Count | Source |
1 | | use std::io::{self, Read, Seek}; |
2 | | use std::sync::atomic::Ordering; |
3 | | use std::sync::Arc; |
4 | | |
5 | | use crate::constants::*; |
6 | | use crate::core::record::Record; |
7 | | use crate::error::{FeoxError, Result}; |
8 | | use crate::storage::format::get_format; |
9 | | |
10 | | use super::FeoxStore; |
11 | | |
12 | | impl FeoxStore { |
13 | | /// Force flush all pending writes to disk. |
14 | | /// |
15 | | /// In persistent mode, ensures all buffered writes are flushed to disk. |
16 | | /// In memory-only mode, this is a no-op. |
17 | | /// |
18 | | /// # Example |
19 | | /// |
20 | | /// ```no_run |
21 | | /// # use feoxdb::FeoxStore; |
22 | | /// # fn main() -> feoxdb::Result<()> { |
23 | | /// let store = FeoxStore::new(Some("/path/to/data.feox".to_string()))?; |
24 | | /// store.insert(b"important", b"data")?; |
25 | | /// store.flush_all(); // Ensure data is persisted |
26 | | /// # Ok(()) |
27 | | /// # } |
28 | | /// ``` |
29 | 9 | pub fn flush_all(&self) { |
30 | 9 | if !self.memory_only { |
31 | | // First flush the write buffer to ensure all data is written |
32 | 9 | if let Some(ref wb) = self.write_buffer { |
33 | 9 | let _ = wb.force_flush(); |
34 | 9 | }0 |
35 | | |
36 | 9 | if let Some(ref disk_io) = self.disk_io { |
37 | 9 | // Update metadata with current stats |
38 | 9 | let mut metadata = self._metadata.write(); |
39 | 9 | metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64; |
40 | 9 | metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed); |
41 | 9 | metadata.fragmentation = self.free_space.read().get_fragmentation(); |
42 | 9 | metadata.update(); |
43 | 9 | |
44 | 9 | // Write metadata |
45 | 9 | let _ = disk_io.write().write_metadata(metadata.as_bytes()); |
46 | 9 | let _ = disk_io.write().flush(); |
47 | 9 | }0 |
48 | 0 | } |
49 | 9 | } |
50 | | |
51 | 16 | pub(super) fn load_value_from_disk(&self, record: &Record) -> Result<Vec<u8>> { |
52 | 16 | let sector = record.sector.load(Ordering::Acquire); |
53 | 16 | if self.memory_only || sector == 0 { |
54 | 0 | return Err(FeoxError::InvalidRecord); |
55 | 16 | } |
56 | | |
57 | | // Get the appropriate format handler |
58 | 16 | let metadata_version = self._metadata.read().version; |
59 | 16 | let format = get_format(metadata_version); |
60 | | |
61 | | // Calculate how many sectors we need to read |
62 | 16 | let total_size = format.total_size(record.key.len(), record.value_len); |
63 | 16 | let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE); |
64 | | |
65 | | // Read the sectors |
66 | 16 | let disk_io = self |
67 | 16 | .disk_io |
68 | 16 | .as_ref() |
69 | 16 | .ok_or_else(|| {0 |
70 | 0 | FeoxError::IoError(io::Error::new( |
71 | 0 | io::ErrorKind::NotFound, |
72 | 0 | "No disk IO available", |
73 | 0 | )) |
74 | 0 | })? |
75 | 16 | .read(); |
76 | | |
77 | 16 | let data = disk_io.read_sectors_sync(sector, sectors_needed as u64)?0 ; |
78 | | |
79 | | // Use format to get the value offset |
80 | 16 | let offset = format.value_offset(record.key.len()); |
81 | 16 | if offset + record.value_len > data.len() { |
82 | 0 | return Err(FeoxError::InvalidRecord); |
83 | 16 | } |
84 | | |
85 | 16 | Ok(data[offset..offset + record.value_len].to_vec()) |
86 | 16 | } |
87 | | |
88 | 18 | pub(super) fn open_device( |
89 | 18 | &mut self, |
90 | 18 | device_path: &Option<String>, |
91 | 18 | file_size: Option<u64>, |
92 | 18 | ) -> Result<()> { |
93 | 18 | if let Some(path) = device_path { |
94 | | // Open the device/file |
95 | | use std::fs::OpenOptions; |
96 | | #[cfg(target_os = "linux")] |
97 | | use std::os::unix::fs::OpenOptionsExt; |
98 | | |
99 | | #[cfg(unix)] |
100 | 18 | let (file, use_direct_io) = if std::path::Path::new("/.dockerenv").exists() { |
101 | 0 | let file = OpenOptions::new() |
102 | 0 | .read(true) |
103 | 0 | .write(true) |
104 | 0 | .create(true) |
105 | 0 | .truncate(false) |
106 | 0 | .open(path) |
107 | 0 | .map_err(FeoxError::IoError)?; |
108 | 0 | (file, false) // Don't use O_DIRECT in Docker |
109 | | } else { |
110 | | // Try with O_DIRECT on Linux, fall back without it on other Unix systems |
111 | | #[cfg(target_os = "linux")] |
112 | | { |
113 | | // Try to open with O_DIRECT first |
114 | 18 | match OpenOptions::new() |
115 | 18 | .read(true) |
116 | 18 | .write(true) |
117 | 18 | .create(true) |
118 | 18 | .truncate(false) |
119 | 18 | .custom_flags(libc::O_DIRECT) |
120 | 18 | .open(path) |
121 | | { |
122 | 18 | Ok(file) => (file, true), // Successfully opened with O_DIRECT |
123 | | Err(_) => { |
124 | | // Fallback to regular open |
125 | 0 | let file = OpenOptions::new() |
126 | 0 | .read(true) |
127 | 0 | .write(true) |
128 | 0 | .create(true) |
129 | 0 | .truncate(false) |
130 | 0 | .open(path) |
131 | 0 | .map_err(FeoxError::IoError)?; |
132 | 0 | (file, false) |
133 | | } |
134 | | } |
135 | | } |
136 | | #[cfg(not(target_os = "linux"))] |
137 | | { |
138 | | let file = OpenOptions::new() |
139 | | .read(true) |
140 | | .write(true) |
141 | | .create(true) |
142 | | .truncate(false) |
143 | | .open(path) |
144 | | .map_err(FeoxError::IoError)?; |
145 | | (file, false) // O_DIRECT not supported on this platform |
146 | | } |
147 | | }; |
148 | | |
149 | | #[cfg(not(unix))] |
150 | | let file = OpenOptions::new() |
151 | | .read(true) |
152 | | .write(true) |
153 | | .create(true) |
154 | | .truncate(false) |
155 | | .open(path) |
156 | | .map_err(FeoxError::IoError)?; |
157 | | |
158 | | // Get file size |
159 | 18 | let metadata = file.metadata().map_err(FeoxError::IoError)?0 ; |
160 | 18 | self.device_size = metadata.len(); |
161 | | |
162 | | // Track whether this is a newly created file |
163 | 18 | let was_newly_created = self.device_size == 0; |
164 | | |
165 | 18 | if was_newly_created { |
166 | | // New empty file - set configured size or default and initialize free space |
167 | 10 | let target_size = file_size.unwrap_or(DEFAULT_DEVICE_SIZE); |
168 | 10 | file.set_len(target_size).map_err(FeoxError::IoError)?0 ; |
169 | 10 | self.device_size = target_size; |
170 | | |
171 | | // Initialize free space manager with all space free |
172 | 10 | self.free_space.write().initialize(self.device_size)?0 ; |
173 | | |
174 | 10 | let mut metadata = self._metadata.write(); |
175 | 10 | metadata.device_size = self.device_size; |
176 | 10 | metadata.update(); |
177 | | } else { |
178 | | // Existing file - check if it's empty |
179 | | // If empty, initialize free space; otherwise it will be rebuilt during scan |
180 | 8 | let is_empty_file = { |
181 | 8 | let mut temp_file = file.try_clone().map_err(FeoxError::IoError)?0 ; |
182 | 8 | temp_file |
183 | 8 | .metadata() |
184 | 8 | .map(|m| { |
185 | | // Check if file is all zeros |
186 | 8 | if m.len() > 0 { |
187 | 8 | let mut buffer = vec![0u8; std::cmp::min(4096, m.len() as usize)]; |
188 | 8 | temp_file.seek(std::io::SeekFrom::Start(0)).ok(); |
189 | 8 | temp_file.read_exact(&mut buffer).ok(); |
190 | 8 | buffer.iter().all(|&b| b == 0) |
191 | | } else { |
192 | 0 | false |
193 | | } |
194 | 8 | }) |
195 | 8 | .unwrap_or(false) |
196 | | }; |
197 | | |
198 | 8 | if is_empty_file { |
199 | | // Empty pre-created file - initialize free space like a new file |
200 | 0 | self.free_space.write().initialize(self.device_size)?; |
201 | 8 | } else { |
202 | 8 | // Existing file with data - free space will be rebuilt during scan |
203 | 8 | self.free_space.write().set_device_size(self.device_size); |
204 | 8 | } |
205 | | } |
206 | | |
207 | | #[cfg(unix)] |
208 | | { |
209 | | use std::os::unix::io::AsRawFd; |
210 | 18 | let file_arc = Arc::new(file); |
211 | 18 | let fd = file_arc.as_raw_fd(); |
212 | 18 | self.device_fd = Some(fd); |
213 | | // Store a clone of the file to keep it alive |
214 | 18 | self.device_file = Some(file_arc.as_ref().try_clone().map_err(FeoxError::IoError)?0 ); |
215 | 18 | let disk_io = crate::storage::io::DiskIO::new(file_arc, use_direct_io)?0 ; |
216 | 18 | self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io))); |
217 | | } |
218 | | |
219 | | #[cfg(not(unix))] |
220 | | { |
221 | | // Store a clone of the file to keep it alive |
222 | | self.device_file = Some(file.try_clone().map_err(FeoxError::IoError)?); |
223 | | let disk_io = crate::storage::io::DiskIO::new_from_file(file)?; |
224 | | self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io))); |
225 | | } |
226 | | |
227 | 18 | let disk_io = self.disk_io.as_ref().unwrap().read(); |
228 | | |
229 | | // Read metadata from existing files (not newly created ones) |
230 | 18 | if !was_newly_created { |
231 | 8 | if let Ok(metadata_bytes) = disk_io.read_metadata() { |
232 | 8 | if let Some(loaded_metadata) = |
233 | 8 | crate::storage::metadata::Metadata::from_bytes(&metadata_bytes) |
234 | 8 | { |
235 | 8 | // Initialize stats from metadata |
236 | 8 | self.stats |
237 | 8 | .disk_usage |
238 | 8 | .store(loaded_metadata.total_size, Ordering::Relaxed); |
239 | 8 | *self._metadata.write() = loaded_metadata; |
240 | 8 | }0 |
241 | 0 | } |
242 | 10 | } |
243 | 0 | } |
244 | 18 | Ok(()) |
245 | 18 | } |
246 | | } |
247 | | |
248 | | impl Drop for FeoxStore { |
249 | 78 | fn drop(&mut self) { |
250 | | // Stop TTL sweeper if running |
251 | 78 | if let Some(mut sweeper0 ) = self.ttl_sweeper.write().take() { |
252 | 0 | sweeper.stop(); |
253 | 78 | } |
254 | | |
255 | | // Signal shutdown to write buffer workers |
256 | 78 | if let Some(ref wb18 ) = self.write_buffer { |
257 | 18 | wb.initiate_shutdown(); |
258 | 60 | } |
259 | | |
260 | | // Write metadata directly without using the write buffer |
261 | 78 | if !self.memory_only { |
262 | 18 | if let Some(ref disk_io) = self.disk_io { |
263 | 18 | // Update metadata with current stats |
264 | 18 | let mut metadata = self._metadata.write(); |
265 | 18 | metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64; |
266 | 18 | metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed); |
267 | 18 | metadata.fragmentation = self.free_space.read().get_fragmentation(); |
268 | 18 | metadata.update(); |
269 | 18 | |
270 | 18 | // Write metadata |
271 | 18 | let _ = disk_io.write().write_metadata(metadata.as_bytes()); |
272 | 18 | let _ = disk_io.write().flush(); |
273 | 18 | }0 |
274 | 60 | } |
275 | | |
276 | | // Take ownership of write_buffer to properly shut it down |
277 | 78 | if let Some(wb_arc18 ) = self.write_buffer.take() { |
278 | | // Try to get mutable access if we're the only owner |
279 | 18 | if let Ok(wb) = Arc::try_unwrap(wb_arc) { |
280 | 18 | // We own it exclusively, can call complete_shutdown |
281 | 18 | let mut wb_mut = wb; |
282 | 18 | wb_mut.complete_shutdown(); |
283 | 18 | }0 |
284 | | // If we can't get exclusive access, workers are already shutting down via initiate_shutdown |
285 | 60 | } |
286 | | |
287 | | // Now it's safe to shutdown disk I/O since workers have exited |
288 | 78 | if let Some(ref disk_io18 ) = self.disk_io { |
289 | 18 | disk_io.write().shutdown(); |
290 | 60 | } |
291 | 78 | } |
292 | | } |