/home/runner/work/feoxdb/feoxdb/src/core/store/recovery.rs
Line | Count | Source |
1 | | use std::sync::atomic::Ordering; |
2 | | use std::sync::Arc; |
3 | | |
4 | | use crate::constants::*; |
5 | | use crate::core::record::Record; |
6 | | use crate::error::{FeoxError, Result}; |
7 | | use crate::storage::format::get_format; |
8 | | use crate::storage::metadata::Metadata; |
9 | | |
10 | | use super::FeoxStore; |
11 | | |
12 | | impl FeoxStore { |
13 | 18 | pub(super) fn load_indexes(&mut self) -> Result<()> { |
14 | 18 | if self.memory_only { |
15 | 0 | return Ok(()); |
16 | 18 | } |
17 | | |
18 | | // Try to read metadata from sector 0 |
19 | 18 | if let Some(ref disk_io) = self.disk_io { |
20 | 18 | let metadata_data = disk_io.read().read_metadata()?0 ; |
21 | | |
22 | | // Check if metadata is valid (has our signature) |
23 | 18 | if metadata_data.len() >= FEOX_SIGNATURE_SIZE { |
24 | 18 | let signature = &metadata_data[..FEOX_SIGNATURE_SIZE]; |
25 | | |
26 | 18 | if signature == FEOX_SIGNATURE { |
27 | | // Parse and store metadata |
28 | 8 | if let Some(metadata) = Metadata::from_bytes(&metadata_data) { |
29 | 8 | *self._metadata.write() = metadata; |
30 | 8 | }0 |
31 | | |
32 | | // Valid metadata found, scan the disk to rebuild indexes |
33 | 8 | self.scan_and_rebuild_indexes()?0 ; |
34 | 10 | } |
35 | 0 | } |
36 | 0 | } |
37 | | |
38 | 18 | Ok(()) |
39 | 18 | } |
40 | | |
41 | 8 | pub(super) fn scan_and_rebuild_indexes(&mut self) -> Result<()> { |
42 | 8 | if self.memory_only || self.device_size == 0 { |
43 | 0 | return Ok(()); |
44 | 8 | } |
45 | | |
46 | 8 | let disk_io = self.disk_io.as_ref().ok_or(FeoxError::NoDevice)?0 ; |
47 | | |
48 | | // Get the appropriate format handler |
49 | 8 | let metadata_version = self._metadata.read().version; |
50 | 8 | let format = get_format(metadata_version); |
51 | | |
52 | 8 | let total_sectors = self.device_size / FEOX_BLOCK_SIZE as u64; |
53 | 8 | let mut sector: u64 = 1; |
54 | 8 | let mut _records_loaded = 0; |
55 | 8 | let mut occupied_sectors = Vec::new(); |
56 | | |
57 | 2.09M | while sector < total_sectors { |
58 | 2.09M | let data = match disk_io.read().read_sectors_sync(sector, 1) { |
59 | 2.09M | Ok(d) => d, |
60 | | Err(_) => { |
61 | 0 | sector += 1; |
62 | 0 | continue; |
63 | | } |
64 | | }; |
65 | | |
66 | 2.09M | if data.len() < SECTOR_HEADER_SIZE { |
67 | 0 | sector += 1; |
68 | 0 | continue; |
69 | 2.09M | } |
70 | | |
71 | | // Check for deletion marker first |
72 | 2.09M | if data.len() >= 8 && &data[..8] == b"\0DELETED" { |
73 | | // This sector has been deleted, skip it |
74 | 0 | sector += 1; |
75 | 0 | continue; |
76 | 2.09M | } |
77 | | |
78 | 2.09M | let marker = u16::from_le_bytes([data[0], data[1]]); |
79 | 2.09M | let seq_num = u16::from_le_bytes([data[2], data[3]]); |
80 | | |
81 | 2.09M | if marker != SECTOR_MARKER || seq_num != 0303 { |
82 | 2.09M | sector += 1; |
83 | 2.09M | continue; |
84 | 303 | } |
85 | | |
86 | 303 | if data.len() < SECTOR_HEADER_SIZE + 2 { |
87 | 0 | sector += 1; |
88 | 0 | continue; |
89 | 303 | } |
90 | | |
91 | | // Parse the record using format trait |
92 | 303 | let (key, value_len, timestamp, ttl_expiry) = match format.parse_record(&data) { |
93 | 303 | Some(parsed) => parsed, |
94 | | None => { |
95 | 0 | sector += 1; |
96 | 0 | continue; |
97 | | } |
98 | | }; |
99 | | |
100 | 303 | if key.is_empty() || key.len() > MAX_KEY_SIZE { |
101 | 0 | sector += 1; |
102 | 0 | continue; |
103 | 303 | } |
104 | | |
105 | | // Calculate total size using format trait |
106 | 303 | let total_size = format.total_size(key.len(), value_len); |
107 | 303 | let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE); |
108 | | |
109 | 303 | let mut record = Record::new(key.clone(), Vec::new(), timestamp); |
110 | 303 | record.sector.store(sector, Ordering::Release); |
111 | 303 | record.value_len = value_len; |
112 | 303 | record.ttl_expiry.store(ttl_expiry, Ordering::Release); |
113 | 303 | record.clear_value(); |
114 | | |
115 | | // Skip expired records during load if TTL is enabled |
116 | 303 | if self.enable_ttl && ttl_expiry > 00 && self0 .get_timestamp() > ttl_expiry { |
117 | 0 | sector += sectors_needed as u64; |
118 | 0 | continue; |
119 | 303 | } |
120 | | |
121 | 303 | let record_arc = Arc::new(record); |
122 | 303 | let key_len = key.len(); |
123 | 303 | self.hash_table.upsert(key.clone(), Arc::clone(&record_arc)); |
124 | 303 | self.tree.insert(key, Arc::clone(&record_arc)); |
125 | | |
126 | 303 | self.stats.record_count.fetch_add(1, Ordering::AcqRel); |
127 | 303 | let record_size = self.calculate_record_size(key_len, value_len); |
128 | 303 | self.stats |
129 | 303 | .memory_usage |
130 | 303 | .fetch_add(record_size, Ordering::AcqRel); |
131 | | |
132 | | // Track disk usage |
133 | 303 | self.stats |
134 | 303 | .disk_usage |
135 | 303 | .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::AcqRel); |
136 | | |
137 | 303 | for i in 0..sectors_needed { |
138 | 303 | occupied_sectors.push(sector + i as u64); |
139 | 303 | } |
140 | | |
141 | 303 | _records_loaded += 1; |
142 | 303 | sector += sectors_needed as u64; |
143 | | } |
144 | | |
145 | | // Now rebuild free space from gaps between occupied sectors |
146 | 8 | occupied_sectors.sort_unstable(); |
147 | | |
148 | | // Start after metadata sectors (sectors 0-15 are reserved) |
149 | 8 | let mut last_end = FEOX_DATA_START_BLOCK; |
150 | | |
151 | 311 | for &occupied_start303 in &occupied_sectors { |
152 | 303 | if occupied_start > last_end { |
153 | 0 | self.free_space |
154 | 0 | .write() |
155 | 0 | .release_sectors(last_end, occupied_start - last_end)?; |
156 | 303 | } |
157 | 303 | last_end = occupied_start + 1; |
158 | | } |
159 | | |
160 | 8 | if last_end < total_sectors { |
161 | 8 | self.free_space |
162 | 8 | .write() |
163 | 8 | .release_sectors(last_end, total_sectors - last_end)?0 ; |
164 | 0 | } |
165 | | |
166 | 8 | Ok(()) |
167 | 8 | } |
168 | | } |