feoxdb/core/store/
internal.rs

1use ahash::RandomState;
2use bytes::Bytes;
3use scc::HashMap;
4use std::sync::atomic::Ordering;
5use std::sync::Arc;
6
7use crate::constants::Operation;
8use crate::core::record::Record;
9use crate::error::{FeoxError, Result};
10use crate::storage::write_buffer::WriteBuffer;
11
12use super::FeoxStore;
13
14impl FeoxStore {
15    pub(super) fn update_record_with_ttl(
16        &self,
17        old_record: &Record,
18        value: &[u8],
19        timestamp: u64,
20        ttl_expiry: u64,
21    ) -> Result<bool> {
22        let new_record = if ttl_expiry > 0 && self.enable_ttl {
23            Arc::new(Record::new_with_timestamp_ttl(
24                old_record.key.clone(),
25                value.to_vec(),
26                timestamp,
27                ttl_expiry,
28            ))
29        } else {
30            Arc::new(Record::new(
31                old_record.key.clone(),
32                value.to_vec(),
33                timestamp,
34            ))
35        };
36
37        let old_value_len = old_record.value_len;
38        let old_size = old_record.calculate_size();
39        let new_size = self.calculate_record_size(old_record.key.len(), value.len());
40
41        let old_record_arc =
42            if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
43                entry
44            } else {
45                return Err(FeoxError::KeyNotFound);
46            };
47
48        let key_vec = new_record.key.clone();
49
50        self.hash_table
51            .upsert(key_vec.clone(), Arc::clone(&new_record));
52        self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
53
54        if new_size > old_size {
55            self.stats
56                .memory_usage
57                .fetch_add(new_size - old_size, Ordering::AcqRel);
58        } else {
59            self.stats
60                .memory_usage
61                .fetch_sub(old_size - new_size, Ordering::AcqRel);
62        }
63
64        // Only do cache and persistence operations if not in memory-only mode
65        if !self.memory_only {
66            if self.enable_caching {
67                if let Some(ref cache) = self.cache {
68                    cache.remove(&key_vec);
69                }
70            }
71
72            if let Some(ref wb) = self.write_buffer {
73                if let Err(e) =
74                    wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
75                {
76                    // Data operation succeeded in memory
77                    let _ = e;
78                }
79
80                if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
81                    // Data operation succeeded in memory
82                    let _ = e;
83                }
84            }
85        }
86
87        Ok(false)
88    }
89
90    /// Update an existing record with a new value (Bytes version for zero-copy).
91    pub(super) fn update_record_with_ttl_bytes(
92        &self,
93        old_record: &Record,
94        value: Bytes,
95        timestamp: u64,
96        ttl_expiry: u64,
97    ) -> Result<bool> {
98        let new_record = if ttl_expiry > 0 && self.enable_ttl {
99            Arc::new(Record::new_from_bytes_with_ttl(
100                old_record.key.clone(),
101                value,
102                timestamp,
103                ttl_expiry,
104            ))
105        } else {
106            Arc::new(Record::new_from_bytes(
107                old_record.key.clone(),
108                value,
109                timestamp,
110            ))
111        };
112
113        let old_value_len = old_record.value_len;
114        let old_size = old_record.calculate_size();
115        let new_size = new_record.calculate_size();
116
117        let old_record_arc =
118            if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
119                entry
120            } else {
121                return Err(FeoxError::KeyNotFound);
122            };
123
124        let key_vec = new_record.key.clone();
125
126        self.hash_table
127            .upsert(key_vec.clone(), Arc::clone(&new_record));
128        self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
129
130        if new_size > old_size {
131            self.stats
132                .memory_usage
133                .fetch_add(new_size - old_size, Ordering::AcqRel);
134        } else {
135            self.stats
136                .memory_usage
137                .fetch_sub(old_size - new_size, Ordering::AcqRel);
138        }
139
140        // Only do cache and persistence operations if not in memory-only mode
141        if !self.memory_only {
142            if self.enable_caching {
143                if let Some(ref cache) = self.cache {
144                    cache.remove(&key_vec);
145                }
146            }
147
148            if let Some(ref wb) = self.write_buffer {
149                if let Err(e) =
150                    wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
151                {
152                    // Data operation succeeded in memory
153                    let _ = e;
154                }
155
156                if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
157                    // Data operation succeeded in memory
158                    let _ = e;
159                }
160            }
161        }
162
163        Ok(false)
164    }
165
166    /// Get access to hash table (for TTL cleaner)
167    pub(crate) fn get_hash_table(&self) -> &HashMap<Vec<u8>, Arc<Record>, RandomState> {
168        &self.hash_table
169    }
170
171    /// Remove from tree (for TTL cleaner)
172    pub(crate) fn remove_from_tree(&self, key: &[u8]) {
173        self.tree.remove(key);
174    }
175
176    /// Get write buffer (for TTL cleaner)
177    pub(crate) fn get_write_buffer(&self) -> Option<&Arc<WriteBuffer>> {
178        self.write_buffer.as_ref()
179    }
180}