feoxdb/core/store/
recovery.rs1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3
4use crate::constants::*;
5use crate::core::record::Record;
6use crate::error::{FeoxError, Result};
7use crate::storage::format::get_format;
8use crate::storage::metadata::Metadata;
9
10use super::FeoxStore;
11
12impl FeoxStore {
13 pub(super) fn load_indexes(&mut self) -> Result<()> {
14 if self.memory_only {
15 return Ok(());
16 }
17
18 if let Some(ref disk_io) = self.disk_io {
20 let metadata_data = disk_io.read().read_metadata()?;
21
22 if metadata_data.len() >= FEOX_SIGNATURE_SIZE {
24 let signature = &metadata_data[..FEOX_SIGNATURE_SIZE];
25
26 if signature == FEOX_SIGNATURE {
27 if let Some(metadata) = Metadata::from_bytes(&metadata_data) {
29 *self._metadata.write() = metadata;
30 }
31
32 self.scan_and_rebuild_indexes()?;
34 }
35 }
36 }
37
38 Ok(())
39 }
40
41 pub(super) fn scan_and_rebuild_indexes(&mut self) -> Result<()> {
42 if self.memory_only || self.device_size == 0 {
43 return Ok(());
44 }
45
46 let disk_io = self.disk_io.as_ref().ok_or(FeoxError::NoDevice)?;
47
48 let metadata_version = self._metadata.read().version;
50 let format = get_format(metadata_version);
51
52 let total_sectors = self.device_size / FEOX_BLOCK_SIZE as u64;
53 let mut sector: u64 = 1;
54 let mut _records_loaded = 0;
55 let mut occupied_sectors = Vec::new();
56
57 while sector < total_sectors {
58 let data = match disk_io.read().read_sectors_sync(sector, 1) {
59 Ok(d) => d,
60 Err(_) => {
61 sector += 1;
62 continue;
63 }
64 };
65
66 if data.len() < SECTOR_HEADER_SIZE {
67 sector += 1;
68 continue;
69 }
70
71 if data.len() >= 8 && &data[..8] == b"\0DELETED" {
73 sector += 1;
75 continue;
76 }
77
78 let marker = u16::from_le_bytes([data[0], data[1]]);
79 let seq_num = u16::from_le_bytes([data[2], data[3]]);
80
81 if marker != SECTOR_MARKER || seq_num != 0 {
82 sector += 1;
83 continue;
84 }
85
86 if data.len() < SECTOR_HEADER_SIZE + 2 {
87 sector += 1;
88 continue;
89 }
90
91 let (key, value_len, timestamp, ttl_expiry) = match format.parse_record(&data) {
93 Some(parsed) => parsed,
94 None => {
95 sector += 1;
96 continue;
97 }
98 };
99
100 if key.is_empty() || key.len() > MAX_KEY_SIZE {
101 sector += 1;
102 continue;
103 }
104
105 let total_size = format.total_size(key.len(), value_len);
107 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
108
109 let mut record = Record::new(key.clone(), Vec::new(), timestamp);
110 record.sector.store(sector, Ordering::Release);
111 record.value_len = value_len;
112 record.ttl_expiry.store(ttl_expiry, Ordering::Release);
113 record.clear_value();
114
115 if self.enable_ttl && ttl_expiry > 0 && self.get_timestamp() > ttl_expiry {
117 sector += sectors_needed as u64;
118 continue;
119 }
120
121 let record_arc = Arc::new(record);
122 let key_len = key.len();
123 self.hash_table.upsert(key.clone(), Arc::clone(&record_arc));
124 self.tree.insert(key, Arc::clone(&record_arc));
125
126 self.stats.record_count.fetch_add(1, Ordering::AcqRel);
127 let record_size = self.calculate_record_size(key_len, value_len);
128 self.stats
129 .memory_usage
130 .fetch_add(record_size, Ordering::AcqRel);
131
132 self.stats
134 .disk_usage
135 .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::AcqRel);
136
137 for i in 0..sectors_needed {
138 occupied_sectors.push(sector + i as u64);
139 }
140
141 _records_loaded += 1;
142 sector += sectors_needed as u64;
143 }
144
145 occupied_sectors.sort_unstable();
147
148 let mut last_end = FEOX_DATA_START_BLOCK;
150
151 for &occupied_start in &occupied_sectors {
152 if occupied_start > last_end {
153 self.free_space
154 .write()
155 .release_sectors(last_end, occupied_start - last_end)?;
156 }
157 last_end = occupied_start + 1;
158 }
159
160 if last_end < total_sectors {
161 self.free_space
162 .write()
163 .release_sectors(last_end, total_sectors - last_end)?;
164 }
165
166 Ok(())
167 }
168}