feoxdb/core/
record.rs

1use bytes::Bytes;
2use crossbeam_epoch::{Atomic, Guard, Shared};
3use std::mem;
4use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
5
6use crate::constants::*;
7
8#[repr(C)]
9#[derive(Debug)]
10pub struct Record {
11    // Cache line 1 (64 bytes) - GET hot path
12    // These 3 fields are accessed together on EVERY GET operation
13    pub key: Vec<u8>,                              // 24 bytes
14    pub value: parking_lot::RwLock<Option<Bytes>>, // ~40 bytes (32 + Option overhead)
15
16    // Cache line 2 (64 bytes) - TTL and metadata
17    pub ttl_expiry: AtomicU64, // 8 bytes - checked on every GET
18    pub timestamp: u64,        // 8 bytes - checked on updates
19    pub value_len: usize,      // 8 bytes - used for size calcs
20    pub sector: AtomicU64,     // 8 bytes - for persistence
21    pub refcount: AtomicU32,   // 4 bytes - memory management
22    pub key_len: u16,          // 2 bytes - used with value_len
23    // Total so far: 40 bytes
24
25    // Still in cache line 2 or start of line 3 - cold fields
26    pub hash_link: AtomicLink,        // 8 bytes - only for hash ops
27    pub cache_ref_bit: AtomicU32,     // 4 bytes - rarely used
28    pub cache_access_time: AtomicU64, // 8 bytes - rarely used
29}
30
31// Custom atomic link for lock-free hash table
32pub struct AtomicLink {
33    pub next: Atomic<Record>,
34}
35
36impl std::fmt::Debug for AtomicLink {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("AtomicLink")
39            .field("next", &"<atomic>")
40            .finish()
41    }
42}
43
44impl Default for AtomicLink {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl AtomicLink {
51    pub fn new() -> Self {
52        Self {
53            next: Atomic::null(),
54        }
55    }
56
57    pub fn load<'g>(&self, guard: &'g Guard) -> Option<Shared<'g, Record>> {
58        let ptr = self.next.load(Ordering::Acquire, guard);
59        if ptr.is_null() {
60            None
61        } else {
62            Some(ptr)
63        }
64    }
65
66    pub fn store(&self, record: Option<Shared<Record>>, _guard: &Guard) {
67        let ptr = record.unwrap_or(Shared::null());
68        self.next.store(ptr, Ordering::Release);
69    }
70
71    pub fn compare_exchange<'g>(
72        &self,
73        current: Shared<'g, Record>,
74        new: Shared<'g, Record>,
75        guard: &'g Guard,
76    ) -> Result<Shared<'g, Record>, Shared<'g, Record>> {
77        self.next
78            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire, guard)
79            .map_err(|e| e.current)
80    }
81}
82
83unsafe impl Send for Record {}
84unsafe impl Sync for Record {}
85
86impl Record {
87    pub fn new(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
88        let key_len = key.len() as u16;
89        let value_len = value.len();
90        let value_bytes = Bytes::from(value);
91
92        Self {
93            // Cache line 1 - GET hot path
94            key,
95            value: parking_lot::RwLock::new(Some(value_bytes)),
96
97            // Cache line 2 - TTL and metadata
98            ttl_expiry: AtomicU64::new(0),
99            timestamp,
100            value_len,
101            sector: AtomicU64::new(0),
102            refcount: AtomicU32::new(1),
103            key_len,
104
105            // Cold fields
106            hash_link: AtomicLink::new(),
107            cache_ref_bit: AtomicU32::new(0),
108            cache_access_time: AtomicU64::new(0),
109        }
110    }
111
112    pub fn new_with_timestamp(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
113        Self::new(key, value, timestamp)
114    }
115
116    pub fn new_with_timestamp_ttl(
117        key: Vec<u8>,
118        value: Vec<u8>,
119        timestamp: u64,
120        ttl_expiry: u64,
121    ) -> Self {
122        let record = Self::new(key, value, timestamp);
123        record.ttl_expiry.store(ttl_expiry, Ordering::Release);
124        record
125    }
126
127    /// Create a new record from a Bytes value (zero-copy)
128    pub fn new_from_bytes(key: Vec<u8>, value: Bytes, timestamp: u64) -> Self {
129        let key_len = key.len() as u16;
130        let value_len = value.len();
131
132        Self {
133            // Cache line 1 - GET hot path
134            key,
135            value: parking_lot::RwLock::new(Some(value)),
136
137            // Cache line 2 - TTL and metadata
138            ttl_expiry: AtomicU64::new(0),
139            timestamp,
140            value_len,
141            sector: AtomicU64::new(0),
142            refcount: AtomicU32::new(1),
143            key_len,
144
145            // Cache line 3 - cold fields
146            hash_link: AtomicLink::new(),
147            cache_ref_bit: AtomicU32::new(0),
148            cache_access_time: AtomicU64::new(0),
149        }
150    }
151
152    /// Create a new record from Bytes with TTL
153    pub fn new_from_bytes_with_ttl(
154        key: Vec<u8>,
155        value: Bytes,
156        timestamp: u64,
157        ttl_expiry: u64,
158    ) -> Self {
159        let record = Self::new_from_bytes(key, value, timestamp);
160        record.ttl_expiry.store(ttl_expiry, Ordering::Release);
161        record
162    }
163
164    pub fn calculate_size(&self) -> usize {
165        mem::size_of::<Self>() + self.key.capacity() + self.value_len
166    }
167
168    pub fn calculate_disk_size(&self) -> usize {
169        let record_size = SECTOR_HEADER_SIZE +
170                         mem::size_of::<u16>() + // key_len
171                         self.key_len as usize +
172                         mem::size_of::<usize>() + // value_len
173                         mem::size_of::<u64>() + // timestamp
174                         self.value_len;
175
176        // Round up to block size
177        record_size.div_ceil(FEOX_BLOCK_SIZE) * FEOX_BLOCK_SIZE
178    }
179
180    /// Get value - returns None if value has been offloaded to disk
181    #[inline]
182    pub fn get_value(&self) -> Option<Bytes> {
183        self.value.read().clone()
184    }
185
186    /// Clear value from memory
187    #[inline]
188    pub fn clear_value(&self) {
189        *self.value.write() = None;
190        std::sync::atomic::fence(Ordering::Release);
191    }
192
193    pub fn inc_ref(&self) {
194        self.refcount.fetch_add(1, Ordering::AcqRel);
195    }
196
197    pub fn dec_ref(&self) -> u32 {
198        let old = self.refcount.fetch_sub(1, Ordering::AcqRel);
199        debug_assert!(old > 0, "Record refcount underflow");
200        old - 1
201    }
202
203    pub fn ref_count(&self) -> u32 {
204        self.refcount.load(Ordering::Acquire)
205    }
206}