1use bytes::Bytes;
2use crossbeam_epoch::{Atomic, Guard, Shared};
3use std::mem;
4use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
5
6use crate::constants::*;
7
8#[repr(C)]
9#[derive(Debug)]
10pub struct Record {
11 pub hash_link: AtomicLink,
13
14 pub key: Vec<u8>,
16 pub key_len: u16,
17 pub value: parking_lot::RwLock<Option<Bytes>>, pub value_len: usize,
19
20 pub timestamp: u64,
22 pub sector: AtomicU64, pub refcount: AtomicU32,
26
27 pub cache_ref_bit: AtomicU32,
29 pub cache_access_time: AtomicU64,
30}
31
32pub struct AtomicLink {
34 pub next: Atomic<Record>,
35}
36
37impl std::fmt::Debug for AtomicLink {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 f.debug_struct("AtomicLink")
40 .field("next", &"<atomic>")
41 .finish()
42 }
43}
44
45impl Default for AtomicLink {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl AtomicLink {
52 pub fn new() -> Self {
53 Self {
54 next: Atomic::null(),
55 }
56 }
57
58 pub fn load<'g>(&self, guard: &'g Guard) -> Option<Shared<'g, Record>> {
59 let ptr = self.next.load(Ordering::Acquire, guard);
60 if ptr.is_null() {
61 None
62 } else {
63 Some(ptr)
64 }
65 }
66
67 pub fn store(&self, record: Option<Shared<Record>>, _guard: &Guard) {
68 let ptr = record.unwrap_or(Shared::null());
69 self.next.store(ptr, Ordering::Release);
70 }
71
72 pub fn compare_exchange<'g>(
73 &self,
74 current: Shared<'g, Record>,
75 new: Shared<'g, Record>,
76 guard: &'g Guard,
77 ) -> Result<Shared<'g, Record>, Shared<'g, Record>> {
78 self.next
79 .compare_exchange(current, new, Ordering::AcqRel, Ordering::Acquire, guard)
80 .map_err(|e| e.current)
81 }
82}
83
84unsafe impl Send for Record {}
85unsafe impl Sync for Record {}
86
87impl Record {
88 pub fn new(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
89 let key_len = key.len() as u16;
90 let value_len = value.len();
91 let value_bytes = Bytes::from(value);
92
93 Self {
94 hash_link: AtomicLink::new(),
95 key_len,
96 key,
97 value: parking_lot::RwLock::new(Some(value_bytes)),
98 value_len,
99 timestamp,
100 sector: AtomicU64::new(0),
101 refcount: AtomicU32::new(1),
102 cache_ref_bit: AtomicU32::new(0),
103 cache_access_time: AtomicU64::new(0),
104 }
105 }
106
107 pub fn new_with_timestamp(key: Vec<u8>, value: Vec<u8>, timestamp: u64) -> Self {
108 Self::new(key, value, timestamp)
109 }
110
111 pub fn calculate_size(&self) -> usize {
112 mem::size_of::<Self>() + self.key.capacity() + self.value_len
113 }
114
115 pub fn calculate_disk_size(&self) -> usize {
116 let record_size = SECTOR_HEADER_SIZE +
117 mem::size_of::<u16>() + self.key_len as usize +
119 mem::size_of::<usize>() + mem::size_of::<u64>() + self.value_len;
122
123 record_size.div_ceil(FEOX_BLOCK_SIZE) * FEOX_BLOCK_SIZE
125 }
126
127 #[inline]
129 pub fn get_value(&self) -> Option<Bytes> {
130 self.value.read().clone()
131 }
132
133 #[inline]
135 pub fn clear_value(&self) {
136 *self.value.write() = None;
137 std::sync::atomic::fence(Ordering::Release);
138 }
139
140 pub fn inc_ref(&self) {
141 self.refcount.fetch_add(1, Ordering::AcqRel);
142 }
143
144 pub fn dec_ref(&self) -> u32 {
145 let old = self.refcount.fetch_sub(1, Ordering::AcqRel);
146 debug_assert!(old > 0, "Record refcount underflow");
147 old - 1
148 }
149
150 pub fn ref_count(&self) -> u32 {
151 self.refcount.load(Ordering::Acquire)
152 }
153}