feoxdb/core/store/
internal.rs1use ahash::RandomState;
2use bytes::Bytes;
3use scc::HashMap;
4use std::sync::atomic::Ordering;
5use std::sync::Arc;
6
7use crate::constants::Operation;
8use crate::core::record::Record;
9use crate::error::{FeoxError, Result};
10use crate::storage::write_buffer::WriteBuffer;
11
12use super::FeoxStore;
13
14impl FeoxStore {
15 pub(super) fn update_record_with_ttl(
16 &self,
17 old_record: &Record,
18 value: &[u8],
19 timestamp: u64,
20 ttl_expiry: u64,
21 ) -> Result<bool> {
22 let new_record = if ttl_expiry > 0 && self.enable_ttl {
23 Arc::new(Record::new_with_timestamp_ttl(
24 old_record.key.clone(),
25 value.to_vec(),
26 timestamp,
27 ttl_expiry,
28 ))
29 } else {
30 Arc::new(Record::new(
31 old_record.key.clone(),
32 value.to_vec(),
33 timestamp,
34 ))
35 };
36
37 let old_value_len = old_record.value_len;
38 let old_size = old_record.calculate_size();
39 let new_size = self.calculate_record_size(old_record.key.len(), value.len());
40
41 let old_record_arc =
42 if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
43 entry
44 } else {
45 return Err(FeoxError::KeyNotFound);
46 };
47
48 let key_vec = new_record.key.clone();
49
50 self.hash_table
51 .upsert(key_vec.clone(), Arc::clone(&new_record));
52 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
53
54 if new_size > old_size {
55 self.stats
56 .memory_usage
57 .fetch_add(new_size - old_size, Ordering::AcqRel);
58 } else {
59 self.stats
60 .memory_usage
61 .fetch_sub(old_size - new_size, Ordering::AcqRel);
62 }
63
64 if !self.memory_only {
66 if self.enable_caching {
67 if let Some(ref cache) = self.cache {
68 cache.remove(&key_vec);
69 }
70 }
71
72 if let Some(ref wb) = self.write_buffer {
73 if let Err(e) =
74 wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
75 {
76 let _ = e;
78 }
79
80 if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
81 let _ = e;
83 }
84 }
85 }
86
87 Ok(false)
88 }
89
90 pub(super) fn update_record_with_ttl_bytes(
92 &self,
93 old_record: &Record,
94 value: Bytes,
95 timestamp: u64,
96 ttl_expiry: u64,
97 ) -> Result<bool> {
98 let new_record = if ttl_expiry > 0 && self.enable_ttl {
99 Arc::new(Record::new_from_bytes_with_ttl(
100 old_record.key.clone(),
101 value,
102 timestamp,
103 ttl_expiry,
104 ))
105 } else {
106 Arc::new(Record::new_from_bytes(
107 old_record.key.clone(),
108 value,
109 timestamp,
110 ))
111 };
112
113 let old_value_len = old_record.value_len;
114 let old_size = old_record.calculate_size();
115 let new_size = new_record.calculate_size();
116
117 let old_record_arc =
118 if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
119 entry
120 } else {
121 return Err(FeoxError::KeyNotFound);
122 };
123
124 let key_vec = new_record.key.clone();
125
126 self.hash_table
127 .upsert(key_vec.clone(), Arc::clone(&new_record));
128 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
129
130 if new_size > old_size {
131 self.stats
132 .memory_usage
133 .fetch_add(new_size - old_size, Ordering::AcqRel);
134 } else {
135 self.stats
136 .memory_usage
137 .fetch_sub(old_size - new_size, Ordering::AcqRel);
138 }
139
140 if !self.memory_only {
142 if self.enable_caching {
143 if let Some(ref cache) = self.cache {
144 cache.remove(&key_vec);
145 }
146 }
147
148 if let Some(ref wb) = self.write_buffer {
149 if let Err(e) =
150 wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
151 {
152 let _ = e;
154 }
155
156 if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
157 let _ = e;
159 }
160 }
161 }
162
163 Ok(false)
164 }
165
166 pub(crate) fn get_hash_table(&self) -> &HashMap<Vec<u8>, Arc<Record>, RandomState> {
168 &self.hash_table
169 }
170
171 pub(crate) fn remove_from_tree(&self, key: &[u8]) {
173 self.tree.remove(key);
174 }
175
176 pub(crate) fn get_write_buffer(&self) -> Option<&Arc<WriteBuffer>> {
178 self.write_buffer.as_ref()
179 }
180}