1use bytes::Bytes;
2use crossbeam_epoch::{Atomic, Guard, Shared};
3use std::mem;
4use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
5
6use crate::constants::*;
7
8#[repr(C)]
9#[derive(Debug)]
10pub struct Record {
11 pub key: Vec<u8>, pub value: parking_lot::RwLock<Option<Bytes>>, pub ttl_expiry: AtomicU64, pub timestamp: u64, pub value_len: usize, pub sector: AtomicU64, pub refcount: AtomicU32, pub key_len: u16, pub hash_link: AtomicLink, pub cache_ref_bit: AtomicU32, pub cache_access_time: AtomicU64, }
30
31pub struct AtomicLink {
33 pub next: Atomic<Record>,
34}
35
36impl std::fmt::Debug for AtomicLink {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("AtomicLink")
39 .field("next", &"<atomic>")
40 .finish()
41 }
42}
43
44impl Default for AtomicLink {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl AtomicLink {
51 pub fn new() -> Self {
52 Self {
53 next: Atomic::null(),
54 }
55 }
56
57 pub fn load<'g>(&self, guard: &'g Guard) -> Option<Shared<'g, Record>> {
58 let ptr = self.next.load(Ordering::Acquire, guard);
59 if ptr.is_null() {
60 None
61 } else {
62 Some(ptr)
63 }
64 }
65
66 pub fn store(&self, record: Option<Shared<Record>>, _guard: &Guard) {
67 let ptr = record.unwrap_or(Shared::null());
68 self.next.store(ptr, Ordering::Release);
69 }
70
71 pub fn compare_exchange<'g>(
72 &self,
73 current: Shared<'g, Record>,
74 new: Shared<'g, Record>,
75 guard: &'g Guard,
76 ) -> Result<Shared<'g, Record>, Shared<'g, Record>> {
77 self.next
78 .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire, guard)
79 .map_err(|e| e.current)
80 }
81}
82
83unsafe impl Send for Record {}
84unsafe impl Sync for Record {}
85
86impl Record {
87 pub fn new(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
88 let key_len = key.len() as u16;
89 let value_len = value.len();
90 let value_bytes = Bytes::from(value);
91
92 Self {
93 key,
95 value: parking_lot::RwLock::new(Some(value_bytes)),
96
97 ttl_expiry: AtomicU64::new(0),
99 timestamp,
100 value_len,
101 sector: AtomicU64::new(0),
102 refcount: AtomicU32::new(1),
103 key_len,
104
105 hash_link: AtomicLink::new(),
107 cache_ref_bit: AtomicU32::new(0),
108 cache_access_time: AtomicU64::new(0),
109 }
110 }
111
112 pub fn new_with_timestamp(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
113 Self::new(key, value, timestamp)
114 }
115
116 pub fn new_with_timestamp_ttl(
117 key: Vec<u8>,
118 value: Vec<u8>,
119 timestamp: u64,
120 ttl_expiry: u64,
121 ) -> Self {
122 let record = Self::new(key, value, timestamp);
123 record.ttl_expiry.store(ttl_expiry, Ordering::Release);
124 record
125 }
126
127 pub fn new_from_bytes(key: Vec<u8>, value: Bytes, timestamp: u64) -> Self {
129 let key_len = key.len() as u16;
130 let value_len = value.len();
131
132 Self {
133 key,
135 value: parking_lot::RwLock::new(Some(value)),
136
137 ttl_expiry: AtomicU64::new(0),
139 timestamp,
140 value_len,
141 sector: AtomicU64::new(0),
142 refcount: AtomicU32::new(1),
143 key_len,
144
145 hash_link: AtomicLink::new(),
147 cache_ref_bit: AtomicU32::new(0),
148 cache_access_time: AtomicU64::new(0),
149 }
150 }
151
152 pub fn new_from_bytes_with_ttl(
154 key: Vec<u8>,
155 value: Bytes,
156 timestamp: u64,
157 ttl_expiry: u64,
158 ) -> Self {
159 let record = Self::new_from_bytes(key, value, timestamp);
160 record.ttl_expiry.store(ttl_expiry, Ordering::Release);
161 record
162 }
163
164 pub fn calculate_size(&self) -> usize {
165 mem::size_of::<Self>() + self.key.capacity() + self.value_len
166 }
167
168 pub fn calculate_disk_size(&self) -> usize {
169 let record_size = SECTOR_HEADER_SIZE +
170 mem::size_of::<u16>() + self.key_len as usize +
172 mem::size_of::<usize>() + mem::size_of::<u64>() + self.value_len;
175
176 record_size.div_ceil(FEOX_BLOCK_SIZE) * FEOX_BLOCK_SIZE
178 }
179
180 #[inline]
182 pub fn get_value(&self) -> Option<Bytes> {
183 self.value.read().clone()
184 }
185
186 #[inline]
188 pub fn clear_value(&self) {
189 *self.value.write() = None;
190 std::sync::atomic::fence(Ordering::Release);
191 }
192
193 pub fn inc_ref(&self) {
194 self.refcount.fetch_add(1, Ordering::AcqRel);
195 }
196
197 pub fn dec_ref(&self) -> u32 {
198 let old = self.refcount.fetch_sub(1, Ordering::AcqRel);
199 debug_assert!(old > 0, "Record refcount underflow");
200 old - 1
201 }
202
203 pub fn ref_count(&self) -> u32 {
204 self.refcount.load(Ordering::Acquire)
205 }
206}