feoxdb/core/store/
persistence.rs

1use std::io::{self, Read, Seek};
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4
5use crate::constants::*;
6use crate::core::record::Record;
7use crate::error::{FeoxError, Result};
8use crate::storage::format::get_format;
9
10use super::FeoxStore;
11
12impl FeoxStore {
13    /// Force flush all pending writes to disk.
14    ///
15    /// In persistent mode, ensures all buffered writes are flushed to disk.
16    /// In memory-only mode, this is a no-op.
17    ///
18    /// # Example
19    ///
20    /// ```no_run
21    /// # use feoxdb::FeoxStore;
22    /// # fn main() -> feoxdb::Result<()> {
23    /// let store = FeoxStore::new(Some("/path/to/data.feox".to_string()))?;
24    /// store.insert(b"important", b"data")?;
25    /// store.flush_all();  // Ensure data is persisted
26    /// # Ok(())
27    /// # }
28    /// ```
29    pub fn flush_all(&self) {
30        if !self.memory_only {
31            // First flush the write buffer to ensure all data is written
32            if let Some(ref wb) = self.write_buffer {
33                let _ = wb.force_flush();
34            }
35
36            if let Some(ref disk_io) = self.disk_io {
37                // Update metadata with current stats
38                let mut metadata = self._metadata.write();
39                metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
40                metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
41                metadata.fragmentation = self.free_space.read().get_fragmentation();
42                metadata.update();
43
44                // Write metadata
45                let _ = disk_io.write().write_metadata(metadata.as_bytes());
46                let _ = disk_io.write().flush();
47            }
48        }
49    }
50
51    pub(super) fn load_value_from_disk(&self, record: &Record) -> Result<Vec<u8>> {
52        let sector = record.sector.load(Ordering::Acquire);
53        if self.memory_only || sector == 0 {
54            return Err(FeoxError::InvalidRecord);
55        }
56
57        // Get the appropriate format handler
58        let metadata_version = self._metadata.read().version;
59        let format = get_format(metadata_version);
60
61        // Calculate how many sectors we need to read
62        let total_size = format.total_size(record.key.len(), record.value_len);
63        let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
64
65        // Read the sectors
66        let disk_io = self
67            .disk_io
68            .as_ref()
69            .ok_or_else(|| {
70                FeoxError::IoError(io::Error::new(
71                    io::ErrorKind::NotFound,
72                    "No disk IO available",
73                ))
74            })?
75            .read();
76
77        let data = disk_io.read_sectors_sync(sector, sectors_needed as u64)?;
78
79        // Use format to get the value offset
80        let offset = format.value_offset(record.key.len());
81        if offset + record.value_len > data.len() {
82            return Err(FeoxError::InvalidRecord);
83        }
84
85        Ok(data[offset..offset + record.value_len].to_vec())
86    }
87
88    pub(super) fn open_device(
89        &mut self,
90        device_path: &Option<String>,
91        file_size: Option<u64>,
92    ) -> Result<()> {
93        if let Some(path) = device_path {
94            // Open the device/file
95            use std::fs::OpenOptions;
96            #[cfg(target_os = "linux")]
97            use std::os::unix::fs::OpenOptionsExt;
98
99            #[cfg(unix)]
100            let (file, use_direct_io) = if std::path::Path::new("/.dockerenv").exists() {
101                let file = OpenOptions::new()
102                    .read(true)
103                    .write(true)
104                    .create(true)
105                    .truncate(false)
106                    .open(path)
107                    .map_err(FeoxError::IoError)?;
108                (file, false) // Don't use O_DIRECT in Docker
109            } else {
110                // Try with O_DIRECT on Linux, fall back without it on other Unix systems
111                #[cfg(target_os = "linux")]
112                {
113                    // Try to open with O_DIRECT first
114                    match OpenOptions::new()
115                        .read(true)
116                        .write(true)
117                        .create(true)
118                        .truncate(false)
119                        .custom_flags(libc::O_DIRECT)
120                        .open(path)
121                    {
122                        Ok(file) => (file, true), // Successfully opened with O_DIRECT
123                        Err(_) => {
124                            // Fallback to regular open
125                            let file = OpenOptions::new()
126                                .read(true)
127                                .write(true)
128                                .create(true)
129                                .truncate(false)
130                                .open(path)
131                                .map_err(FeoxError::IoError)?;
132                            (file, false)
133                        }
134                    }
135                }
136                #[cfg(not(target_os = "linux"))]
137                {
138                    let file = OpenOptions::new()
139                        .read(true)
140                        .write(true)
141                        .create(true)
142                        .truncate(false)
143                        .open(path)
144                        .map_err(FeoxError::IoError)?;
145                    (file, false) // O_DIRECT not supported on this platform
146                }
147            };
148
149            #[cfg(not(unix))]
150            let file = OpenOptions::new()
151                .read(true)
152                .write(true)
153                .create(true)
154                .truncate(false)
155                .open(path)
156                .map_err(FeoxError::IoError)?;
157
158            // Get file size
159            let metadata = file.metadata().map_err(FeoxError::IoError)?;
160            self.device_size = metadata.len();
161
162            // Track whether this is a newly created file
163            let was_newly_created = self.device_size == 0;
164
165            if was_newly_created {
166                // New empty file - set configured size or default and initialize free space
167                let target_size = file_size.unwrap_or(DEFAULT_DEVICE_SIZE);
168                file.set_len(target_size).map_err(FeoxError::IoError)?;
169                self.device_size = target_size;
170
171                // Initialize free space manager with all space free
172                self.free_space.write().initialize(self.device_size)?;
173
174                let mut metadata = self._metadata.write();
175                metadata.device_size = self.device_size;
176                metadata.update();
177            } else {
178                // Existing file - check if it's empty
179                // If empty, initialize free space; otherwise it will be rebuilt during scan
180                let is_empty_file = {
181                    let mut temp_file = file.try_clone().map_err(FeoxError::IoError)?;
182                    temp_file
183                        .metadata()
184                        .map(|m| {
185                            // Check if file is all zeros
186                            if m.len() > 0 {
187                                let mut buffer = vec![0u8; std::cmp::min(4096, m.len() as usize)];
188                                temp_file.seek(std::io::SeekFrom::Start(0)).ok();
189                                temp_file.read_exact(&mut buffer).ok();
190                                buffer.iter().all(|&b| b == 0)
191                            } else {
192                                false
193                            }
194                        })
195                        .unwrap_or(false)
196                };
197
198                if is_empty_file {
199                    // Empty pre-created file - initialize free space like a new file
200                    self.free_space.write().initialize(self.device_size)?;
201                } else {
202                    // Existing file with data - free space will be rebuilt during scan
203                    self.free_space.write().set_device_size(self.device_size);
204                }
205            }
206
207            #[cfg(unix)]
208            {
209                use std::os::unix::io::AsRawFd;
210                let file_arc = Arc::new(file);
211                let fd = file_arc.as_raw_fd();
212                self.device_fd = Some(fd);
213                // Store a clone of the file to keep it alive
214                self.device_file = Some(file_arc.as_ref().try_clone().map_err(FeoxError::IoError)?);
215                let disk_io = crate::storage::io::DiskIO::new(file_arc, use_direct_io)?;
216                self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io)));
217            }
218
219            #[cfg(not(unix))]
220            {
221                // Store a clone of the file to keep it alive
222                self.device_file = Some(file.try_clone().map_err(FeoxError::IoError)?);
223                let disk_io = crate::storage::io::DiskIO::new_from_file(file)?;
224                self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io)));
225            }
226
227            let disk_io = self.disk_io.as_ref().unwrap().read();
228
229            // Read metadata from existing files (not newly created ones)
230            if !was_newly_created {
231                if let Ok(metadata_bytes) = disk_io.read_metadata() {
232                    if let Some(loaded_metadata) =
233                        crate::storage::metadata::Metadata::from_bytes(&metadata_bytes)
234                    {
235                        // Initialize stats from metadata
236                        self.stats
237                            .disk_usage
238                            .store(loaded_metadata.total_size, Ordering::Relaxed);
239                        *self._metadata.write() = loaded_metadata;
240                    }
241                }
242            }
243        }
244        Ok(())
245    }
246}
247
248impl Drop for FeoxStore {
249    fn drop(&mut self) {
250        // Stop TTL sweeper if running
251        if let Some(mut sweeper) = self.ttl_sweeper.write().take() {
252            sweeper.stop();
253        }
254
255        // Signal shutdown to write buffer workers
256        if let Some(ref wb) = self.write_buffer {
257            wb.initiate_shutdown();
258        }
259
260        // Write metadata directly without using the write buffer
261        if !self.memory_only {
262            if let Some(ref disk_io) = self.disk_io {
263                // Update metadata with current stats
264                let mut metadata = self._metadata.write();
265                metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
266                metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
267                metadata.fragmentation = self.free_space.read().get_fragmentation();
268                metadata.update();
269
270                // Write metadata
271                let _ = disk_io.write().write_metadata(metadata.as_bytes());
272                let _ = disk_io.write().flush();
273            }
274        }
275
276        // Take ownership of write_buffer to properly shut it down
277        if let Some(wb_arc) = self.write_buffer.take() {
278            // Try to get mutable access if we're the only owner
279            if let Ok(wb) = Arc::try_unwrap(wb_arc) {
280                // We own it exclusively, can call complete_shutdown
281                let mut wb_mut = wb;
282                wb_mut.complete_shutdown();
283            }
284            // If we can't get exclusive access, workers are already shutting down via initiate_shutdown
285        }
286
287        // Now it's safe to shutdown disk I/O since workers have exited
288        if let Some(ref disk_io) = self.disk_io {
289            disk_io.write().shutdown();
290        }
291    }
292}