/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/core/store/internal.rs
Line | Count | Source |
1 | | use ahash::RandomState; |
2 | | use bytes::Bytes; |
3 | | use scc::HashMap; |
4 | | use std::sync::atomic::Ordering; |
5 | | use std::sync::Arc; |
6 | | |
7 | | use crate::constants::Operation; |
8 | | use crate::core::record::Record; |
9 | | use crate::error::{FeoxError, Result}; |
10 | | use crate::storage::write_buffer::WriteBuffer; |
11 | | |
12 | | use super::FeoxStore; |
13 | | |
14 | | impl FeoxStore { |
15 | 499 | pub(super) fn update_record_with_ttl( |
16 | 499 | &self, |
17 | 499 | old_record: &Record, |
18 | 499 | value: &[u8], |
19 | 499 | timestamp: u64, |
20 | 499 | ttl_expiry: u64, |
21 | 499 | ) -> Result<bool> { |
22 | 499 | let new_record = if ttl_expiry > 0 && self.enable_ttl1 { |
23 | 1 | Arc::new(Record::new_with_timestamp_ttl( |
24 | 1 | old_record.key.clone(), |
25 | 1 | value.to_vec(), |
26 | 1 | timestamp, |
27 | 1 | ttl_expiry, |
28 | | )) |
29 | | } else { |
30 | 498 | Arc::new(Record::new( |
31 | 498 | old_record.key.clone(), |
32 | 498 | value.to_vec(), |
33 | 498 | timestamp, |
34 | | )) |
35 | | }; |
36 | | |
37 | 499 | let old_value_len = old_record.value_len; |
38 | 499 | let old_size = old_record.calculate_size(); |
39 | 499 | let new_size = self.calculate_record_size(old_record.key.len(), value.len()); |
40 | | |
41 | 499 | let old_record_arc = |
42 | 499 | if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) { |
43 | 499 | entry |
44 | | } else { |
45 | 0 | return Err(FeoxError::KeyNotFound); |
46 | | }; |
47 | | |
48 | 499 | let key_vec = new_record.key.clone(); |
49 | | |
50 | 499 | self.hash_table |
51 | 499 | .upsert(key_vec.clone(), Arc::clone(&new_record)); |
52 | 499 | self.tree.insert(key_vec.clone(), Arc::clone(&new_record)); |
53 | | |
54 | 499 | if new_size > old_size { |
55 | 93 | self.stats |
56 | 93 | .memory_usage |
57 | 93 | .fetch_add(new_size - old_size, Ordering::AcqRel); |
58 | 406 | } else { |
59 | 406 | self.stats |
60 | 406 | .memory_usage |
61 | 406 | .fetch_sub(old_size - new_size, Ordering::AcqRel); |
62 | 406 | } |
63 | | |
64 | | // Only do cache and persistence operations if not in memory-only mode |
65 | 499 | if !self.memory_only { |
66 | 3 | if self.enable_caching { |
67 | 3 | if let Some(ref cache) = self.cache { |
68 | 3 | cache.remove(&key_vec); |
69 | 3 | }0 |
70 | 0 | } |
71 | | |
72 | 3 | if let Some(ref wb) = self.write_buffer { |
73 | 0 | if let Err(e) = |
74 | 3 | wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len) |
75 | 0 | { |
76 | 0 | // Data operation succeeded in memory |
77 | 0 | let _ = e; |
78 | 3 | } |
79 | | |
80 | 3 | if let Err(e0 ) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) { |
81 | 0 | // Data operation succeeded in memory |
82 | 0 | let _ = e; |
83 | 3 | } |
84 | 0 | } |
85 | 496 | } |
86 | | |
87 | 499 | Ok(false) |
88 | 499 | } |
89 | | |
90 | | /// Update an existing record with a new value (Bytes version for zero-copy). |
91 | 4 | pub(super) fn update_record_with_ttl_bytes( |
92 | 4 | &self, |
93 | 4 | old_record: &Record, |
94 | 4 | value: Bytes, |
95 | 4 | timestamp: u64, |
96 | 4 | ttl_expiry: u64, |
97 | 4 | ) -> Result<bool> { |
98 | 4 | let new_record = if ttl_expiry > 0 && self.enable_ttl1 { |
99 | 1 | Arc::new(Record::new_from_bytes_with_ttl( |
100 | 1 | old_record.key.clone(), |
101 | 1 | value, |
102 | 1 | timestamp, |
103 | 1 | ttl_expiry, |
104 | | )) |
105 | | } else { |
106 | 3 | Arc::new(Record::new_from_bytes( |
107 | 3 | old_record.key.clone(), |
108 | 3 | value, |
109 | 3 | timestamp, |
110 | | )) |
111 | | }; |
112 | | |
113 | 4 | let old_value_len = old_record.value_len; |
114 | 4 | let old_size = old_record.calculate_size(); |
115 | 4 | let new_size = new_record.calculate_size(); |
116 | | |
117 | 4 | let old_record_arc = |
118 | 4 | if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) { |
119 | 4 | entry |
120 | | } else { |
121 | 0 | return Err(FeoxError::KeyNotFound); |
122 | | }; |
123 | | |
124 | 4 | let key_vec = new_record.key.clone(); |
125 | | |
126 | 4 | self.hash_table |
127 | 4 | .upsert(key_vec.clone(), Arc::clone(&new_record)); |
128 | 4 | self.tree.insert(key_vec.clone(), Arc::clone(&new_record)); |
129 | | |
130 | 4 | if new_size > old_size { |
131 | 1 | self.stats |
132 | 1 | .memory_usage |
133 | 1 | .fetch_add(new_size - old_size, Ordering::AcqRel); |
134 | 3 | } else { |
135 | 3 | self.stats |
136 | 3 | .memory_usage |
137 | 3 | .fetch_sub(old_size - new_size, Ordering::AcqRel); |
138 | 3 | } |
139 | | |
140 | | // Only do cache and persistence operations if not in memory-only mode |
141 | 4 | if !self.memory_only { |
142 | 0 | if self.enable_caching { |
143 | 0 | if let Some(ref cache) = self.cache { |
144 | 0 | cache.remove(&key_vec); |
145 | 0 | } |
146 | 0 | } |
147 | | |
148 | 0 | if let Some(ref wb) = self.write_buffer { |
149 | 0 | if let Err(e) = |
150 | 0 | wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len) |
151 | 0 | { |
152 | 0 | // Data operation succeeded in memory |
153 | 0 | let _ = e; |
154 | 0 | } |
155 | | |
156 | 0 | if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) { |
157 | 0 | // Data operation succeeded in memory |
158 | 0 | let _ = e; |
159 | 0 | } |
160 | 0 | } |
161 | 4 | } |
162 | | |
163 | 4 | Ok(false) |
164 | 4 | } |
165 | | |
166 | | /// Get access to hash table (for TTL cleaner) |
167 | 0 | pub(crate) fn get_hash_table(&self) -> &HashMap<Vec<u8>, Arc<Record>, RandomState> { |
168 | 0 | &self.hash_table |
169 | 0 | } |
170 | | |
171 | | /// Remove from tree (for TTL cleaner) |
172 | 0 | pub(crate) fn remove_from_tree(&self, key: &[u8]) { |
173 | 0 | self.tree.remove(key); |
174 | 0 | } |
175 | | |
176 | | /// Get write buffer (for TTL cleaner) |
177 | 0 | pub(crate) fn get_write_buffer(&self) -> Option<&Arc<WriteBuffer>> { |
178 | 0 | self.write_buffer.as_ref() |
179 | 0 | } |
180 | | } |