/home/runner/work/feoxdb/feoxdb/src/core/record.rs
Line | Count | Source |
1 | | use bytes::Bytes; |
2 | | use crossbeam_epoch::{Atomic, Guard, Shared}; |
3 | | use std::mem; |
4 | | use std::sync::atomic::{AtomicU32, AtomicU64, Ordering}; |
5 | | |
6 | | use crate::constants::*; |
7 | | |
8 | | #[repr(C)] |
9 | | #[derive(Debug)] |
10 | | pub struct Record { |
11 | | // Cache line 1 (64 bytes) - GET hot path |
12 | | // These 3 fields are accessed together on EVERY GET operation |
13 | | pub key: Vec<u8>, // 24 bytes |
14 | | pub value: parking_lot::RwLock<Option<Bytes>>, // ~40 bytes (32 + Option overhead) |
15 | | |
16 | | // Cache line 2 (64 bytes) - TTL and metadata |
17 | | pub ttl_expiry: AtomicU64, // 8 bytes - checked on every GET |
18 | | pub timestamp: u64, // 8 bytes - checked on updates |
19 | | pub value_len: usize, // 8 bytes - used for size calcs |
20 | | pub sector: AtomicU64, // 8 bytes - for persistence |
21 | | pub refcount: AtomicU32, // 4 bytes - memory management |
22 | | pub key_len: u16, // 2 bytes - used with value_len |
23 | | // Total so far: 40 bytes |
24 | | |
25 | | // Still in cache line 2 or start of line 3 - cold fields |
26 | | pub hash_link: AtomicLink, // 8 bytes - only for hash ops |
27 | | pub cache_ref_bit: AtomicU32, // 4 bytes - rarely used |
28 | | pub cache_access_time: AtomicU64, // 8 bytes - rarely used |
29 | | } |
30 | | |
31 | | // Custom atomic link for lock-free hash table |
32 | | pub struct AtomicLink { |
33 | | pub next: Atomic<Record>, |
34 | | } |
35 | | |
36 | | impl std::fmt::Debug for AtomicLink { |
37 | 0 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
38 | 0 | f.debug_struct("AtomicLink") |
39 | 0 | .field("next", &"<atomic>") |
40 | 0 | .finish() |
41 | 0 | } |
42 | | } |
43 | | |
44 | | impl Default for AtomicLink { |
45 | 0 | fn default() -> Self { |
46 | 0 | Self::new() |
47 | 0 | } |
48 | | } |
49 | | |
50 | | impl AtomicLink { |
51 | 25.6k | pub fn new() -> Self { |
52 | 25.6k | Self { |
53 | 25.6k | next: Atomic::null(), |
54 | 25.6k | } |
55 | 25.6k | } |
56 | | |
57 | 3 | pub fn load<'g>(&self, guard: &'g Guard) -> Option<Shared<'g, Record>> { |
58 | 3 | let ptr = self.next.load(Ordering::Acquire, guard); |
59 | 3 | if ptr.is_null() { |
60 | 1 | None |
61 | | } else { |
62 | 2 | Some(ptr) |
63 | | } |
64 | 3 | } |
65 | | |
66 | 1 | pub fn store(&self, record: Option<Shared<Record>>, _guard: &Guard) { |
67 | 1 | let ptr = record.unwrap_or(Shared::null()); |
68 | 1 | self.next.store(ptr, Ordering::Release); |
69 | 1 | } |
70 | | |
71 | 1 | pub fn compare_exchange<'g>( |
72 | 1 | &self, |
73 | 1 | current: Shared<'g, Record>, |
74 | 1 | new: Shared<'g, Record>, |
75 | 1 | guard: &'g Guard, |
76 | 1 | ) -> Result<Shared<'g, Record>, Shared<'g, Record>> { |
77 | 1 | self.next |
78 | 1 | .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire, guard) |
79 | 1 | .map_err(|e| e.current) |
80 | 1 | } |
81 | | } |
82 | | |
83 | | unsafe impl Send for Record {} |
84 | | unsafe impl Sync for Record {} |
85 | | |
86 | | impl Record { |
87 | 25.6k | pub fn new(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self { |
88 | 25.6k | let key_len = key.len() as u16; |
89 | 25.6k | let value_len = value.len(); |
90 | 25.6k | let value_bytes = Bytes::from(value); |
91 | | |
92 | 25.6k | Self { |
93 | 25.6k | // Cache line 1 - GET hot path |
94 | 25.6k | key, |
95 | 25.6k | value: parking_lot::RwLock::new(Some(value_bytes)), |
96 | 25.6k | |
97 | 25.6k | // Cache line 2 - TTL and metadata |
98 | 25.6k | ttl_expiry: AtomicU64::new(0), |
99 | 25.6k | timestamp, |
100 | 25.6k | value_len, |
101 | 25.6k | sector: AtomicU64::new(0), |
102 | 25.6k | refcount: AtomicU32::new(1), |
103 | 25.6k | key_len, |
104 | 25.6k | |
105 | 25.6k | // Cold fields |
106 | 25.6k | hash_link: AtomicLink::new(), |
107 | 25.6k | cache_ref_bit: AtomicU32::new(0), |
108 | 25.6k | cache_access_time: AtomicU64::new(0), |
109 | 25.6k | } |
110 | 25.6k | } |
111 | | |
112 | 1 | pub fn new_with_timestamp(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self { |
113 | 1 | Self::new(key, value, timestamp) |
114 | 1 | } |
115 | | |
116 | 7 | pub fn new_with_timestamp_ttl( |
117 | 7 | key: Vec<u8>, |
118 | 7 | value: Vec<u8>, |
119 | 7 | timestamp: u64, |
120 | 7 | ttl_expiry: u64, |
121 | 7 | ) -> Self { |
122 | 7 | let record = Self::new(key, value, timestamp); |
123 | 7 | record.ttl_expiry.store(ttl_expiry, Ordering::Release); |
124 | 7 | record |
125 | 7 | } |
126 | | |
127 | | /// Create a new record from a Bytes value (zero-copy) |
128 | 12 | pub fn new_from_bytes(key: Vec<u8>, value: Bytes, timestamp: u64) -> Self { |
129 | 12 | let key_len = key.len() as u16; |
130 | 12 | let value_len = value.len(); |
131 | | |
132 | 12 | Self { |
133 | 12 | // Cache line 1 - GET hot path |
134 | 12 | key, |
135 | 12 | value: parking_lot::RwLock::new(Some(value)), |
136 | 12 | |
137 | 12 | // Cache line 2 - TTL and metadata |
138 | 12 | ttl_expiry: AtomicU64::new(0), |
139 | 12 | timestamp, |
140 | 12 | value_len, |
141 | 12 | sector: AtomicU64::new(0), |
142 | 12 | refcount: AtomicU32::new(1), |
143 | 12 | key_len, |
144 | 12 | |
145 | 12 | // Cache line 3 - cold fields |
146 | 12 | hash_link: AtomicLink::new(), |
147 | 12 | cache_ref_bit: AtomicU32::new(0), |
148 | 12 | cache_access_time: AtomicU64::new(0), |
149 | 12 | } |
150 | 12 | } |
151 | | |
152 | | /// Create a new record from Bytes with TTL |
153 | 4 | pub fn new_from_bytes_with_ttl( |
154 | 4 | key: Vec<u8>, |
155 | 4 | value: Bytes, |
156 | 4 | timestamp: u64, |
157 | 4 | ttl_expiry: u64, |
158 | 4 | ) -> Self { |
159 | 4 | let record = Self::new_from_bytes(key, value, timestamp); |
160 | 4 | record.ttl_expiry.store(ttl_expiry, Ordering::Release); |
161 | 4 | record |
162 | 4 | } |
163 | | |
164 | 24.2k | pub fn calculate_size(&self) -> usize { |
165 | 24.2k | mem::size_of::<Self>() + self.key.capacity() + self.value_len |
166 | 24.2k | } |
167 | | |
168 | 1 | pub fn calculate_disk_size(&self) -> usize { |
169 | 1 | let record_size = SECTOR_HEADER_SIZE + |
170 | 1 | mem::size_of::<u16>() + // key_len |
171 | 1 | self.key_len as usize + |
172 | 1 | mem::size_of::<usize>() + // value_len |
173 | 1 | mem::size_of::<u64>() + // timestamp |
174 | 1 | self.value_len; |
175 | | |
176 | | // Round up to block size |
177 | 1 | record_size.div_ceil(FEOX_BLOCK_SIZE) * FEOX_BLOCK_SIZE |
178 | 1 | } |
179 | | |
180 | | /// Get value - returns None if value has been offloaded to disk |
181 | | #[inline] |
182 | 6.35k | pub fn get_value(&self) -> Option<Bytes> { |
183 | 6.35k | self.value.read().clone() |
184 | 6.35k | } |
185 | | |
186 | | /// Clear value from memory |
187 | | #[inline] |
188 | 21.7k | pub fn clear_value(&self) { |
189 | 21.7k | *self.value.write() = None; |
190 | 21.7k | std::sync::atomic::fence(Ordering::Release); |
191 | 21.7k | } |
192 | | |
193 | 1.00k | pub fn inc_ref(&self) { |
194 | 1.00k | self.refcount.fetch_add(1, Ordering::AcqRel); |
195 | 1.00k | } |
196 | | |
197 | 2 | pub fn dec_ref(&self) -> u32 { |
198 | 2 | let old = self.refcount.fetch_sub(1, Ordering::AcqRel); |
199 | 2 | debug_assert!(old > 0, "Record refcount underflow"0 ); |
200 | 2 | old - 1 |
201 | 2 | } |
202 | | |
203 | 5 | pub fn ref_count(&self) -> u32 { |
204 | 5 | self.refcount.load(Ordering::Acquire) |
205 | 5 | } |
206 | | } |