Coverage Report

Created: 2025-09-19 09:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/core/store/persistence.rs
Line
Count
Source
1
use std::io::{self, Read, Seek};
2
use std::sync::atomic::Ordering;
3
use std::sync::Arc;
4
5
use crate::constants::*;
6
use crate::core::record::Record;
7
use crate::error::{FeoxError, Result};
8
use crate::storage::format::get_format;
9
10
use super::FeoxStore;
11
12
impl FeoxStore {
13
    /// Force flush all pending writes to disk.
14
    ///
15
    /// In persistent mode, ensures all buffered writes are flushed to disk.
16
    /// In memory-only mode, this is a no-op.
17
    ///
18
    /// # Example
19
    ///
20
    /// ```no_run
21
    /// # use feoxdb::FeoxStore;
22
    /// # fn main() -> feoxdb::Result<()> {
23
    /// let store = FeoxStore::new(Some("/path/to/data.feox".to_string()))?;
24
    /// store.insert(b"important", b"data")?;
25
    /// store.flush_all();  // Ensure data is persisted
26
    /// # Ok(())
27
    /// # }
28
    /// ```
29
9
    pub fn flush_all(&self) {
30
9
        if !self.memory_only {
31
            // First flush the write buffer to ensure all data is written
32
9
            if let Some(ref wb) = self.write_buffer {
33
9
                let _ = wb.force_flush();
34
9
            
}0
35
36
9
            if let Some(ref disk_io) = self.disk_io {
37
9
                // Update metadata with current stats
38
9
                let mut metadata = self._metadata.write();
39
9
                metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
40
9
                metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
41
9
                metadata.fragmentation = self.free_space.read().get_fragmentation();
42
9
                metadata.update();
43
9
44
9
                // Write metadata
45
9
                let _ = disk_io.write().write_metadata(metadata.as_bytes());
46
9
                let _ = disk_io.write().flush();
47
9
            
}0
48
0
        }
49
9
    }
50
51
16
    pub(super) fn load_value_from_disk(&self, record: &Record) -> Result<Vec<u8>> {
52
16
        let sector = record.sector.load(Ordering::Acquire);
53
16
        if self.memory_only || sector == 0 {
54
0
            return Err(FeoxError::InvalidRecord);
55
16
        }
56
57
        // Get the appropriate format handler
58
16
        let metadata_version = self._metadata.read().version;
59
16
        let format = get_format(metadata_version);
60
61
        // Calculate how many sectors we need to read
62
16
        let total_size = format.total_size(record.key.len(), record.value_len);
63
16
        let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
64
65
        // Read the sectors
66
16
        let disk_io = self
67
16
            .disk_io
68
16
            .as_ref()
69
16
            .ok_or_else(|| 
{0
70
0
                FeoxError::IoError(io::Error::new(
71
0
                    io::ErrorKind::NotFound,
72
0
                    "No disk IO available",
73
0
                ))
74
0
            })?
75
16
            .read();
76
77
16
        let data = disk_io.read_sectors_sync(sector, sectors_needed as u64)
?0
;
78
79
        // Use format to get the value offset
80
16
        let offset = format.value_offset(record.key.len());
81
16
        if offset + record.value_len > data.len() {
82
0
            return Err(FeoxError::InvalidRecord);
83
16
        }
84
85
16
        Ok(data[offset..offset + record.value_len].to_vec())
86
16
    }
87
88
18
    pub(super) fn open_device(
89
18
        &mut self,
90
18
        device_path: &Option<String>,
91
18
        file_size: Option<u64>,
92
18
    ) -> Result<()> {
93
18
        if let Some(path) = device_path {
94
            // Open the device/file
95
            use std::fs::OpenOptions;
96
            #[cfg(target_os = "linux")]
97
            use std::os::unix::fs::OpenOptionsExt;
98
99
            #[cfg(unix)]
100
18
            let (file, use_direct_io) = if std::path::Path::new("/.dockerenv").exists() {
101
0
                let file = OpenOptions::new()
102
0
                    .read(true)
103
0
                    .write(true)
104
0
                    .create(true)
105
0
                    .truncate(false)
106
0
                    .open(path)
107
0
                    .map_err(FeoxError::IoError)?;
108
0
                (file, false) // Don't use O_DIRECT in Docker
109
            } else {
110
                // Try with O_DIRECT on Linux, fall back without it on other Unix systems
111
                #[cfg(target_os = "linux")]
112
                {
113
                    // Try to open with O_DIRECT first
114
18
                    match OpenOptions::new()
115
18
                        .read(true)
116
18
                        .write(true)
117
18
                        .create(true)
118
18
                        .truncate(false)
119
18
                        .custom_flags(libc::O_DIRECT)
120
18
                        .open(path)
121
                    {
122
18
                        Ok(file) => (file, true), // Successfully opened with O_DIRECT
123
                        Err(_) => {
124
                            // Fallback to regular open
125
0
                            let file = OpenOptions::new()
126
0
                                .read(true)
127
0
                                .write(true)
128
0
                                .create(true)
129
0
                                .truncate(false)
130
0
                                .open(path)
131
0
                                .map_err(FeoxError::IoError)?;
132
0
                            (file, false)
133
                        }
134
                    }
135
                }
136
                #[cfg(not(target_os = "linux"))]
137
                {
138
                    let file = OpenOptions::new()
139
                        .read(true)
140
                        .write(true)
141
                        .create(true)
142
                        .truncate(false)
143
                        .open(path)
144
                        .map_err(FeoxError::IoError)?;
145
                    (file, false) // O_DIRECT not supported on this platform
146
                }
147
            };
148
149
            #[cfg(not(unix))]
150
            let file = OpenOptions::new()
151
                .read(true)
152
                .write(true)
153
                .create(true)
154
                .truncate(false)
155
                .open(path)
156
                .map_err(FeoxError::IoError)?;
157
158
            // Get file size
159
18
            let metadata = file.metadata().map_err(FeoxError::IoError)
?0
;
160
18
            self.device_size = metadata.len();
161
162
            // Track whether this is a newly created file
163
18
            let was_newly_created = self.device_size == 0;
164
165
18
            if was_newly_created {
166
                // New empty file - set configured size or default and initialize free space
167
10
                let target_size = file_size.unwrap_or(DEFAULT_DEVICE_SIZE);
168
10
                file.set_len(target_size).map_err(FeoxError::IoError)
?0
;
169
10
                self.device_size = target_size;
170
171
                // Initialize free space manager with all space free
172
10
                self.free_space.write().initialize(self.device_size)
?0
;
173
174
10
                let mut metadata = self._metadata.write();
175
10
                metadata.device_size = self.device_size;
176
10
                metadata.update();
177
            } else {
178
                // Existing file - check if it's empty
179
                // If empty, initialize free space; otherwise it will be rebuilt during scan
180
8
                let is_empty_file = {
181
8
                    let mut temp_file = file.try_clone().map_err(FeoxError::IoError)
?0
;
182
8
                    temp_file
183
8
                        .metadata()
184
8
                        .map(|m| {
185
                            // Check if file is all zeros
186
8
                            if m.len() > 0 {
187
8
                                let mut buffer = vec![0u8; std::cmp::min(4096, m.len() as usize)];
188
8
                                temp_file.seek(std::io::SeekFrom::Start(0)).ok();
189
8
                                temp_file.read_exact(&mut buffer).ok();
190
8
                                buffer.iter().all(|&b| b == 0)
191
                            } else {
192
0
                                false
193
                            }
194
8
                        })
195
8
                        .unwrap_or(false)
196
                };
197
198
8
                if is_empty_file {
199
                    // Empty pre-created file - initialize free space like a new file
200
0
                    self.free_space.write().initialize(self.device_size)?;
201
8
                } else {
202
8
                    // Existing file with data - free space will be rebuilt during scan
203
8
                    self.free_space.write().set_device_size(self.device_size);
204
8
                }
205
            }
206
207
            #[cfg(unix)]
208
            {
209
                use std::os::unix::io::AsRawFd;
210
18
                let file_arc = Arc::new(file);
211
18
                let fd = file_arc.as_raw_fd();
212
18
                self.device_fd = Some(fd);
213
                // Store a clone of the file to keep it alive
214
18
                self.device_file = Some(file_arc.as_ref().try_clone().map_err(FeoxError::IoError)
?0
);
215
18
                let disk_io = crate::storage::io::DiskIO::new(file_arc, use_direct_io)
?0
;
216
18
                self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io)));
217
            }
218
219
            #[cfg(not(unix))]
220
            {
221
                // Store a clone of the file to keep it alive
222
                self.device_file = Some(file.try_clone().map_err(FeoxError::IoError)?);
223
                let disk_io = crate::storage::io::DiskIO::new_from_file(file)?;
224
                self.disk_io = Some(Arc::new(parking_lot::RwLock::new(disk_io)));
225
            }
226
227
18
            let disk_io = self.disk_io.as_ref().unwrap().read();
228
229
            // Read metadata from existing files (not newly created ones)
230
18
            if !was_newly_created {
231
8
                if let Ok(metadata_bytes) = disk_io.read_metadata() {
232
8
                    if let Some(loaded_metadata) =
233
8
                        crate::storage::metadata::Metadata::from_bytes(&metadata_bytes)
234
8
                    {
235
8
                        // Initialize stats from metadata
236
8
                        self.stats
237
8
                            .disk_usage
238
8
                            .store(loaded_metadata.total_size, Ordering::Relaxed);
239
8
                        *self._metadata.write() = loaded_metadata;
240
8
                    
}0
241
0
                }
242
10
            }
243
0
        }
244
18
        Ok(())
245
18
    }
246
}
247
248
impl Drop for FeoxStore {
249
78
    fn drop(&mut self) {
250
        // Stop TTL sweeper if running
251
78
        if let Some(
mut sweeper0
) = self.ttl_sweeper.write().take() {
252
0
            sweeper.stop();
253
78
        }
254
255
        // Signal shutdown to write buffer workers
256
78
        if let Some(
ref wb18
) = self.write_buffer {
257
18
            wb.initiate_shutdown();
258
60
        }
259
260
        // Write metadata directly without using the write buffer
261
78
        if !self.memory_only {
262
18
            if let Some(ref disk_io) = self.disk_io {
263
18
                // Update metadata with current stats
264
18
                let mut metadata = self._metadata.write();
265
18
                metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
266
18
                metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
267
18
                metadata.fragmentation = self.free_space.read().get_fragmentation();
268
18
                metadata.update();
269
18
270
18
                // Write metadata
271
18
                let _ = disk_io.write().write_metadata(metadata.as_bytes());
272
18
                let _ = disk_io.write().flush();
273
18
            
}0
274
60
        }
275
276
        // Take ownership of write_buffer to properly shut it down
277
78
        if let Some(
wb_arc18
) = self.write_buffer.take() {
278
            // Try to get mutable access if we're the only owner
279
18
            if let Ok(wb) = Arc::try_unwrap(wb_arc) {
280
18
                // We own it exclusively, can call complete_shutdown
281
18
                let mut wb_mut = wb;
282
18
                wb_mut.complete_shutdown();
283
18
            
}0
284
            // If we can't get exclusive access, workers are already shutting down via initiate_shutdown
285
60
        }
286
287
        // Now it's safe to shutdown disk I/O since workers have exited
288
78
        if let Some(
ref disk_io18
) = self.disk_io {
289
18
            disk_io.write().shutdown();
290
60
        }
291
78
    }
292
}