feoxdb/core/store/
recovery.rs

1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3
4use crate::constants::*;
5use crate::core::record::Record;
6use crate::error::{FeoxError, Result};
7use crate::storage::format::get_format;
8use crate::storage::metadata::Metadata;
9
10use super::FeoxStore;
11
12impl FeoxStore {
13    pub(super) fn load_indexes(&mut self) -> Result<()> {
14        if self.memory_only {
15            return Ok(());
16        }
17
18        // Try to read metadata from sector 0
19        if let Some(ref disk_io) = self.disk_io {
20            let metadata_data = disk_io.read().read_metadata()?;
21
22            // Check if metadata is valid (has our signature)
23            if metadata_data.len() >= FEOX_SIGNATURE_SIZE {
24                let signature = &metadata_data[..FEOX_SIGNATURE_SIZE];
25
26                if signature == FEOX_SIGNATURE {
27                    // Parse and store metadata
28                    if let Some(metadata) = Metadata::from_bytes(&metadata_data) {
29                        *self._metadata.write() = metadata;
30                    }
31
32                    // Valid metadata found, scan the disk to rebuild indexes
33                    self.scan_and_rebuild_indexes()?;
34                }
35            }
36        }
37
38        Ok(())
39    }
40
41    pub(super) fn scan_and_rebuild_indexes(&mut self) -> Result<()> {
42        if self.memory_only || self.device_size == 0 {
43            return Ok(());
44        }
45
46        let disk_io = self.disk_io.as_ref().ok_or(FeoxError::NoDevice)?;
47
48        // Get the appropriate format handler
49        let metadata_version = self._metadata.read().version;
50        let format = get_format(metadata_version);
51
52        let total_sectors = self.device_size / FEOX_BLOCK_SIZE as u64;
53        let mut sector: u64 = 1;
54        let mut _records_loaded = 0;
55        let mut occupied_sectors = Vec::new();
56
57        while sector < total_sectors {
58            let data = match disk_io.read().read_sectors_sync(sector, 1) {
59                Ok(d) => d,
60                Err(_) => {
61                    sector += 1;
62                    continue;
63                }
64            };
65
66            if data.len() < SECTOR_HEADER_SIZE {
67                sector += 1;
68                continue;
69            }
70
71            // Check for deletion marker first
72            if data.len() >= 8 && &data[..8] == b"\0DELETED" {
73                // This sector has been deleted, skip it
74                sector += 1;
75                continue;
76            }
77
78            let marker = u16::from_le_bytes([data[0], data[1]]);
79            let seq_num = u16::from_le_bytes([data[2], data[3]]);
80
81            if marker != SECTOR_MARKER || seq_num != 0 {
82                sector += 1;
83                continue;
84            }
85
86            if data.len() < SECTOR_HEADER_SIZE + 2 {
87                sector += 1;
88                continue;
89            }
90
91            // Parse the record using format trait
92            let (key, value_len, timestamp, ttl_expiry) = match format.parse_record(&data) {
93                Some(parsed) => parsed,
94                None => {
95                    sector += 1;
96                    continue;
97                }
98            };
99
100            if key.is_empty() || key.len() > MAX_KEY_SIZE {
101                sector += 1;
102                continue;
103            }
104
105            // Calculate total size using format trait
106            let total_size = format.total_size(key.len(), value_len);
107            let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
108
109            let mut record = Record::new(key.clone(), Vec::new(), timestamp);
110            record.sector.store(sector, Ordering::Release);
111            record.value_len = value_len;
112            record.ttl_expiry.store(ttl_expiry, Ordering::Release);
113            record.clear_value();
114
115            // Skip expired records during load if TTL is enabled
116            if self.enable_ttl && ttl_expiry > 0 && self.get_timestamp() > ttl_expiry {
117                sector += sectors_needed as u64;
118                continue;
119            }
120
121            let record_arc = Arc::new(record);
122            let key_len = key.len();
123            self.hash_table.upsert(key.clone(), Arc::clone(&record_arc));
124            self.tree.insert(key, Arc::clone(&record_arc));
125
126            self.stats.record_count.fetch_add(1, Ordering::AcqRel);
127            let record_size = self.calculate_record_size(key_len, value_len);
128            self.stats
129                .memory_usage
130                .fetch_add(record_size, Ordering::AcqRel);
131
132            // Track disk usage
133            self.stats
134                .disk_usage
135                .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::AcqRel);
136
137            for i in 0..sectors_needed {
138                occupied_sectors.push(sector + i as u64);
139            }
140
141            _records_loaded += 1;
142            sector += sectors_needed as u64;
143        }
144
145        // Now rebuild free space from gaps between occupied sectors
146        occupied_sectors.sort_unstable();
147
148        // Start after metadata sectors (sectors 0-15 are reserved)
149        let mut last_end = FEOX_DATA_START_BLOCK;
150
151        for &occupied_start in &occupied_sectors {
152            if occupied_start > last_end {
153                self.free_space
154                    .write()
155                    .release_sectors(last_end, occupied_start - last_end)?;
156            }
157            last_end = occupied_start + 1;
158        }
159
160        if last_end < total_sectors {
161            self.free_space
162                .write()
163                .release_sectors(last_end, total_sectors - last_end)?;
164        }
165
166        Ok(())
167    }
168}