feoxdb/core/
record.rs

1use bytes::Bytes;
2use crossbeam_epoch::{Atomic, Guard, Shared};
3use std::mem;
4use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
5
6use crate::constants::*;
7
8#[repr(C)]
9#[derive(Debug)]
10pub struct Record {
11    // Hash table link for intrusive list
12    pub hash_link: AtomicLink,
13
14    // Key and value data
15    pub key: Vec<u8>,
16    pub key_len: u16,
17    pub value: parking_lot::RwLock<Option<Bytes>>, // None when value is on disk
18    pub value_len: usize,
19
20    // Metadata
21    pub timestamp: u64,
22    pub sector: AtomicU64, // Disk sector location (0 means in-memory only)
23
24    // Reference counting for RCU-style access
25    pub refcount: AtomicU32,
26
27    // Cache metadata
28    pub cache_ref_bit: AtomicU32,
29    pub cache_access_time: AtomicU64,
30}
31
32// Custom atomic link for lock-free hash table
33pub struct AtomicLink {
34    pub next: Atomic<Record>,
35}
36
37impl std::fmt::Debug for AtomicLink {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        f.debug_struct("AtomicLink")
40            .field("next", &"<atomic>")
41            .finish()
42    }
43}
44
45impl Default for AtomicLink {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl AtomicLink {
52    pub fn new() -> Self {
53        Self {
54            next: Atomic::null(),
55        }
56    }
57
58    pub fn load<'g>(&self, guard: &'g Guard) -> Option<Shared<'g, Record>> {
59        let ptr = self.next.load(Ordering::Acquire, guard);
60        if ptr.is_null() {
61            None
62        } else {
63            Some(ptr)
64        }
65    }
66
67    pub fn store(&self, record: Option<Shared<Record>>, _guard: &Guard) {
68        let ptr = record.unwrap_or(Shared::null());
69        self.next.store(ptr, Ordering::Release);
70    }
71
72    pub fn compare_exchange<'g>(
73        &self,
74        current: Shared<'g, Record>,
75        new: Shared<'g, Record>,
76        guard: &'g Guard,
77    ) -> Result<Shared<'g, Record>, Shared<'g, Record>> {
78        self.next
79            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire, guard)
80            .map_err(|e| e.current)
81    }
82}
83
84unsafe impl Send for Record {}
85unsafe impl Sync for Record {}
86
87impl Record {
88    pub fn new(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
89        let key_len = key.len() as u16;
90        let value_len = value.len();
91        let value_bytes = Bytes::from(value);
92
93        Self {
94            hash_link: AtomicLink::new(),
95            key_len,
96            key,
97            value: parking_lot::RwLock::new(Some(value_bytes)),
98            value_len,
99            timestamp,
100            sector: AtomicU64::new(0),
101            refcount: AtomicU32::new(1),
102            cache_ref_bit: AtomicU32::new(0),
103            cache_access_time: AtomicU64::new(0),
104        }
105    }
106
107    pub fn new_with_timestamp(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
108        Self::new(key, value, timestamp)
109    }
110
111    pub fn calculate_size(&self) -> usize {
112        mem::size_of::<Self>() + self.key.capacity() + self.value_len
113    }
114
115    pub fn calculate_disk_size(&self) -> usize {
116        let record_size = SECTOR_HEADER_SIZE +
117                         mem::size_of::<u16>() + // key_len
118                         self.key_len as usize +
119                         mem::size_of::<usize>() + // value_len
120                         mem::size_of::<u64>() + // timestamp
121                         self.value_len;
122
123        // Round up to block size
124        record_size.div_ceil(FEOX_BLOCK_SIZE) * FEOX_BLOCK_SIZE
125    }
126
127    /// Get value - returns None if value has been offloaded to disk
128    #[inline]
129    pub fn get_value(&self) -> Option<Bytes> {
130        self.value.read().clone()
131    }
132
133    /// Clear value from memory
134    #[inline]
135    pub fn clear_value(&self) {
136        *self.value.write() = None;
137        std::sync::atomic::fence(Ordering::Release);
138    }
139
140    pub fn inc_ref(&self) {
141        self.refcount.fetch_add(1, Ordering::AcqRel);
142    }
143
144    pub fn dec_ref(&self) -> u32 {
145        let old = self.refcount.fetch_sub(1, Ordering::AcqRel);
146        debug_assert!(old > 0, "Record refcount underflow");
147        old - 1
148    }
149
150    pub fn ref_count(&self) -> u32 {
151        self.refcount.load(Ordering::Acquire)
152    }
153}