1use crate::constants::*;
2use crate::core::record::Record;
3use std::sync::atomic::Ordering;
4
5pub trait RecordFormat: Send + Sync {
7 fn record_header_size(&self, key_len: usize) -> usize;
9
10 fn total_size(&self, key_len: usize, value_len: usize) -> usize;
12
13 fn serialize_record(&self, record: &Record, include_value: bool) -> Vec<u8>;
15
16 fn parse_record(&self, data: &[u8]) -> Option<(Vec<u8>, usize, u64, u64)>;
18
19 fn value_offset(&self, key_len: usize) -> usize;
21}
22
23pub struct FormatV1;
25
26impl RecordFormat for FormatV1 {
27 fn record_header_size(&self, key_len: usize) -> usize {
28 SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 }
30
31 fn total_size(&self, key_len: usize, value_len: usize) -> usize {
32 self.record_header_size(key_len) + value_len
33 }
34
35 fn serialize_record(&self, record: &Record, include_value: bool) -> Vec<u8> {
36 let mut data = Vec::with_capacity(self.total_size(record.key.len(), record.value_len));
37
38 data.extend_from_slice(&(record.key.len() as u16).to_le_bytes());
40
41 data.extend_from_slice(&record.key);
43
44 data.extend_from_slice(&(record.value_len as u64).to_le_bytes());
46
47 data.extend_from_slice(&record.timestamp.to_le_bytes());
49
50 if include_value {
52 if let Some(value) = record.value.read().as_ref() {
53 data.extend_from_slice(value);
54 }
55 }
56
57 data
58 }
59
60 fn parse_record(&self, data: &[u8]) -> Option<(Vec<u8>, usize, u64, u64)> {
61 if data.len() < SECTOR_HEADER_SIZE + 2 {
62 return None;
63 }
64
65 let mut offset = SECTOR_HEADER_SIZE + 2;
66 let key_len = u16::from_le_bytes(
67 data[SECTOR_HEADER_SIZE..SECTOR_HEADER_SIZE + 2]
68 .try_into()
69 .ok()?,
70 ) as usize;
71
72 if offset + key_len + 16 > data.len() {
73 return None;
74 }
75
76 let key = data[offset..offset + key_len].to_vec();
77 offset += key_len;
78
79 let value_len = u64::from_le_bytes(data[offset..offset + 8].try_into().ok()?) as usize;
80 offset += 8;
81
82 let timestamp = u64::from_le_bytes(data[offset..offset + 8].try_into().ok()?);
83
84 Some((key, value_len, timestamp, 0)) }
86
87 fn value_offset(&self, key_len: usize) -> usize {
88 SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8
89 }
90}
91
92pub struct FormatV2;
94
95impl RecordFormat for FormatV2 {
96 fn record_header_size(&self, key_len: usize) -> usize {
97 SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + 8 }
99
100 fn total_size(&self, key_len: usize, value_len: usize) -> usize {
101 self.record_header_size(key_len) + value_len
102 }
103
104 fn serialize_record(&self, record: &Record, include_value: bool) -> Vec<u8> {
105 let mut data = Vec::with_capacity(self.total_size(record.key.len(), record.value_len));
106
107 data.extend_from_slice(&(record.key.len() as u16).to_le_bytes());
109
110 data.extend_from_slice(&record.key);
112
113 data.extend_from_slice(&(record.value_len as u64).to_le_bytes());
115
116 data.extend_from_slice(&record.timestamp.to_le_bytes());
118
119 data.extend_from_slice(&record.ttl_expiry.load(Ordering::Acquire).to_le_bytes());
121
122 if include_value {
124 if let Some(value) = record.value.read().as_ref() {
125 data.extend_from_slice(value);
126 }
127 }
128
129 data
130 }
131
132 fn parse_record(&self, data: &[u8]) -> Option<(Vec<u8>, usize, u64, u64)> {
133 if data.len() < SECTOR_HEADER_SIZE + 2 {
134 return None;
135 }
136
137 let mut offset = SECTOR_HEADER_SIZE + 2;
138 let key_len = u16::from_le_bytes(
139 data[SECTOR_HEADER_SIZE..SECTOR_HEADER_SIZE + 2]
140 .try_into()
141 .ok()?,
142 ) as usize;
143
144 if offset + key_len + 24 > data.len() {
145 return None;
147 }
148
149 let key = data[offset..offset + key_len].to_vec();
150 offset += key_len;
151
152 let value_len = u64::from_le_bytes(data[offset..offset + 8].try_into().ok()?) as usize;
153 offset += 8;
154
155 let timestamp = u64::from_le_bytes(data[offset..offset + 8].try_into().ok()?);
156 offset += 8;
157
158 let ttl_expiry = u64::from_le_bytes(data[offset..offset + 8].try_into().ok()?);
159
160 Some((key, value_len, timestamp, ttl_expiry))
161 }
162
163 fn value_offset(&self, key_len: usize) -> usize {
164 SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + 8
165 }
166}
167
168pub fn get_format(version: u32) -> Box<dyn RecordFormat> {
170 match version {
171 1 => Box::new(FormatV1),
172 2 => Box::new(FormatV2),
173 _ => Box::new(FormatV2), }
175}