Coverage Report

Created: 2026-01-04 13:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/runner/work/feoxdb/feoxdb/src/core/record.rs
Line
Count
Source
1
use bytes::Bytes;
2
use crossbeam_epoch::{Atomic, Guard, Shared};
3
use std::mem;
4
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
5
6
use crate::constants::*;
7
8
#[repr(C)]
9
#[derive(Debug)]
10
pub struct Record {
11
    // Cache line 1 (64 bytes) - GET hot path
12
    // These 3 fields are accessed together on EVERY GET operation
13
    pub key: Vec<u8>,                              // 24 bytes
14
    pub value: parking_lot::RwLock<Option<Bytes>>, // ~40 bytes (32 + Option overhead)
15
16
    // Cache line 2 (64 bytes) - TTL and metadata
17
    pub ttl_expiry: AtomicU64, // 8 bytes - checked on every GET
18
    pub timestamp: u64,        // 8 bytes - checked on updates
19
    pub value_len: usize,      // 8 bytes - used for size calcs
20
    pub sector: AtomicU64,     // 8 bytes - for persistence
21
    pub refcount: AtomicU32,   // 4 bytes - memory management
22
    pub key_len: u16,          // 2 bytes - used with value_len
23
    // Total so far: 40 bytes
24
25
    // Still in cache line 2 or start of line 3 - cold fields
26
    pub hash_link: AtomicLink,        // 8 bytes - only for hash ops
27
    pub cache_ref_bit: AtomicU32,     // 4 bytes - rarely used
28
    pub cache_access_time: AtomicU64, // 8 bytes - rarely used
29
}
30
31
// Custom atomic link for lock-free hash table
32
pub struct AtomicLink {
33
    pub next: Atomic<Record>,
34
}
35
36
impl std::fmt::Debug for AtomicLink {
37
0
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38
0
        f.debug_struct("AtomicLink")
39
0
            .field("next", &"<atomic>")
40
0
            .finish()
41
0
    }
42
}
43
44
impl Default for AtomicLink {
45
0
    fn default() -> Self {
46
0
        Self::new()
47
0
    }
48
}
49
50
impl AtomicLink {
51
25.6k
    pub fn new() -> Self {
52
25.6k
        Self {
53
25.6k
            next: Atomic::null(),
54
25.6k
        }
55
25.6k
    }
56
57
3
    pub fn load<'g>(&self, guard: &'g Guard) -> Option<Shared<'g, Record>> {
58
3
        let ptr = self.next.load(Ordering::Acquire, guard);
59
3
        if ptr.is_null() {
60
1
            None
61
        } else {
62
2
            Some(ptr)
63
        }
64
3
    }
65
66
1
    pub fn store(&self, record: Option<Shared<Record>>, _guard: &Guard) {
67
1
        let ptr = record.unwrap_or(Shared::null());
68
1
        self.next.store(ptr, Ordering::Release);
69
1
    }
70
71
1
    pub fn compare_exchange<'g>(
72
1
        &self,
73
1
        current: Shared<'g, Record>,
74
1
        new: Shared<'g, Record>,
75
1
        guard: &'g Guard,
76
1
    ) -> Result<Shared<'g, Record>, Shared<'g, Record>> {
77
1
        self.next
78
1
            .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire, guard)
79
1
            .map_err(|e| e.current)
80
1
    }
81
}
82
83
unsafe impl Send for Record {}
84
unsafe impl Sync for Record {}
85
86
impl Record {
87
25.6k
    pub fn new(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
88
25.6k
        let key_len = key.len() as u16;
89
25.6k
        let value_len = value.len();
90
25.6k
        let value_bytes = Bytes::from(value);
91
92
25.6k
        Self {
93
25.6k
            // Cache line 1 - GET hot path
94
25.6k
            key,
95
25.6k
            value: parking_lot::RwLock::new(Some(value_bytes)),
96
25.6k
97
25.6k
            // Cache line 2 - TTL and metadata
98
25.6k
            ttl_expiry: AtomicU64::new(0),
99
25.6k
            timestamp,
100
25.6k
            value_len,
101
25.6k
            sector: AtomicU64::new(0),
102
25.6k
            refcount: AtomicU32::new(1),
103
25.6k
            key_len,
104
25.6k
105
25.6k
            // Cold fields
106
25.6k
            hash_link: AtomicLink::new(),
107
25.6k
            cache_ref_bit: AtomicU32::new(0),
108
25.6k
            cache_access_time: AtomicU64::new(0),
109
25.6k
        }
110
25.6k
    }
111
112
1
    pub fn new_with_timestamp(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
113
1
        Self::new(key, value, timestamp)
114
1
    }
115
116
7
    pub fn new_with_timestamp_ttl(
117
7
        key: Vec<u8>,
118
7
        value: Vec<u8>,
119
7
        timestamp: u64,
120
7
        ttl_expiry: u64,
121
7
    ) -> Self {
122
7
        let record = Self::new(key, value, timestamp);
123
7
        record.ttl_expiry.store(ttl_expiry, Ordering::Release);
124
7
        record
125
7
    }
126
127
    /// Create a new record from a Bytes value (zero-copy)
128
12
    pub fn new_from_bytes(key: Vec<u8>, value: Bytes, timestamp: u64) -> Self {
129
12
        let key_len = key.len() as u16;
130
12
        let value_len = value.len();
131
132
12
        Self {
133
12
            // Cache line 1 - GET hot path
134
12
            key,
135
12
            value: parking_lot::RwLock::new(Some(value)),
136
12
137
12
            // Cache line 2 - TTL and metadata
138
12
            ttl_expiry: AtomicU64::new(0),
139
12
            timestamp,
140
12
            value_len,
141
12
            sector: AtomicU64::new(0),
142
12
            refcount: AtomicU32::new(1),
143
12
            key_len,
144
12
145
12
            // Cache line 3 - cold fields
146
12
            hash_link: AtomicLink::new(),
147
12
            cache_ref_bit: AtomicU32::new(0),
148
12
            cache_access_time: AtomicU64::new(0),
149
12
        }
150
12
    }
151
152
    /// Create a new record from Bytes with TTL
153
4
    pub fn new_from_bytes_with_ttl(
154
4
        key: Vec<u8>,
155
4
        value: Bytes,
156
4
        timestamp: u64,
157
4
        ttl_expiry: u64,
158
4
    ) -> Self {
159
4
        let record = Self::new_from_bytes(key, value, timestamp);
160
4
        record.ttl_expiry.store(ttl_expiry, Ordering::Release);
161
4
        record
162
4
    }
163
164
24.2k
    pub fn calculate_size(&self) -> usize {
165
24.2k
        mem::size_of::<Self>() + self.key.capacity() + self.value_len
166
24.2k
    }
167
168
1
    pub fn calculate_disk_size(&self) -> usize {
169
1
        let record_size = SECTOR_HEADER_SIZE +
170
1
                         mem::size_of::<u16>() + // key_len
171
1
                         self.key_len as usize +
172
1
                         mem::size_of::<usize>() + // value_len
173
1
                         mem::size_of::<u64>() + // timestamp
174
1
                         self.value_len;
175
176
        // Round up to block size
177
1
        record_size.div_ceil(FEOX_BLOCK_SIZE) * FEOX_BLOCK_SIZE
178
1
    }
179
180
    /// Get value - returns None if value has been offloaded to disk
181
    #[inline]
182
6.35k
    pub fn get_value(&self) -> Option<Bytes> {
183
6.35k
        self.value.read().clone()
184
6.35k
    }
185
186
    /// Clear value from memory
187
    #[inline]
188
21.7k
    pub fn clear_value(&self) {
189
21.7k
        *self.value.write() = None;
190
21.7k
        std::sync::atomic::fence(Ordering::Release);
191
21.7k
    }
192
193
1.00k
    pub fn inc_ref(&self) {
194
1.00k
        self.refcount.fetch_add(1, Ordering::AcqRel);
195
1.00k
    }
196
197
2
    pub fn dec_ref(&self) -> u32 {
198
2
        let old = self.refcount.fetch_sub(1, Ordering::AcqRel);
199
2
        debug_assert!(old > 0, 
"Record refcount underflow"0
);
200
2
        old - 1
201
2
    }
202
203
5
    pub fn ref_count(&self) -> u32 {
204
5
        self.refcount.load(Ordering::Acquire)
205
5
    }
206
}