Coverage Report

Created: 2025-09-19 09:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/core/store/internal.rs
Line
Count
Source
1
use ahash::RandomState;
2
use bytes::Bytes;
3
use scc::HashMap;
4
use std::sync::atomic::Ordering;
5
use std::sync::Arc;
6
7
use crate::constants::Operation;
8
use crate::core::record::Record;
9
use crate::error::{FeoxError, Result};
10
use crate::storage::write_buffer::WriteBuffer;
11
12
use super::FeoxStore;
13
14
impl FeoxStore {
15
499
    pub(super) fn update_record_with_ttl(
16
499
        &self,
17
499
        old_record: &Record,
18
499
        value: &[u8],
19
499
        timestamp: u64,
20
499
        ttl_expiry: u64,
21
499
    ) -> Result<bool> {
22
499
        let new_record = if ttl_expiry > 0 && 
self.enable_ttl1
{
23
1
            Arc::new(Record::new_with_timestamp_ttl(
24
1
                old_record.key.clone(),
25
1
                value.to_vec(),
26
1
                timestamp,
27
1
                ttl_expiry,
28
            ))
29
        } else {
30
498
            Arc::new(Record::new(
31
498
                old_record.key.clone(),
32
498
                value.to_vec(),
33
498
                timestamp,
34
            ))
35
        };
36
37
499
        let old_value_len = old_record.value_len;
38
499
        let old_size = old_record.calculate_size();
39
499
        let new_size = self.calculate_record_size(old_record.key.len(), value.len());
40
41
499
        let old_record_arc =
42
499
            if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
43
499
                entry
44
            } else {
45
0
                return Err(FeoxError::KeyNotFound);
46
            };
47
48
499
        let key_vec = new_record.key.clone();
49
50
499
        self.hash_table
51
499
            .upsert(key_vec.clone(), Arc::clone(&new_record));
52
499
        self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
53
54
499
        if new_size > old_size {
55
93
            self.stats
56
93
                .memory_usage
57
93
                .fetch_add(new_size - old_size, Ordering::AcqRel);
58
406
        } else {
59
406
            self.stats
60
406
                .memory_usage
61
406
                .fetch_sub(old_size - new_size, Ordering::AcqRel);
62
406
        }
63
64
        // Only do cache and persistence operations if not in memory-only mode
65
499
        if !self.memory_only {
66
3
            if self.enable_caching {
67
3
                if let Some(ref cache) = self.cache {
68
3
                    cache.remove(&key_vec);
69
3
                
}0
70
0
            }
71
72
3
            if let Some(ref wb) = self.write_buffer {
73
0
                if let Err(e) =
74
3
                    wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
75
0
                {
76
0
                    // Data operation succeeded in memory
77
0
                    let _ = e;
78
3
                }
79
80
3
                if let Err(
e0
) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
81
0
                    // Data operation succeeded in memory
82
0
                    let _ = e;
83
3
                }
84
0
            }
85
496
        }
86
87
499
        Ok(false)
88
499
    }
89
90
    /// Update an existing record with a new value (Bytes version for zero-copy).
91
4
    pub(super) fn update_record_with_ttl_bytes(
92
4
        &self,
93
4
        old_record: &Record,
94
4
        value: Bytes,
95
4
        timestamp: u64,
96
4
        ttl_expiry: u64,
97
4
    ) -> Result<bool> {
98
4
        let new_record = if ttl_expiry > 0 && 
self.enable_ttl1
{
99
1
            Arc::new(Record::new_from_bytes_with_ttl(
100
1
                old_record.key.clone(),
101
1
                value,
102
1
                timestamp,
103
1
                ttl_expiry,
104
            ))
105
        } else {
106
3
            Arc::new(Record::new_from_bytes(
107
3
                old_record.key.clone(),
108
3
                value,
109
3
                timestamp,
110
            ))
111
        };
112
113
4
        let old_value_len = old_record.value_len;
114
4
        let old_size = old_record.calculate_size();
115
4
        let new_size = new_record.calculate_size();
116
117
4
        let old_record_arc =
118
4
            if let Some(entry) = self.hash_table.read(&old_record.key, |_, v| v.clone()) {
119
4
                entry
120
            } else {
121
0
                return Err(FeoxError::KeyNotFound);
122
            };
123
124
4
        let key_vec = new_record.key.clone();
125
126
4
        self.hash_table
127
4
            .upsert(key_vec.clone(), Arc::clone(&new_record));
128
4
        self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
129
130
4
        if new_size > old_size {
131
1
            self.stats
132
1
                .memory_usage
133
1
                .fetch_add(new_size - old_size, Ordering::AcqRel);
134
3
        } else {
135
3
            self.stats
136
3
                .memory_usage
137
3
                .fetch_sub(old_size - new_size, Ordering::AcqRel);
138
3
        }
139
140
        // Only do cache and persistence operations if not in memory-only mode
141
4
        if !self.memory_only {
142
0
            if self.enable_caching {
143
0
                if let Some(ref cache) = self.cache {
144
0
                    cache.remove(&key_vec);
145
0
                }
146
0
            }
147
148
0
            if let Some(ref wb) = self.write_buffer {
149
0
                if let Err(e) =
150
0
                    wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
151
0
                {
152
0
                    // Data operation succeeded in memory
153
0
                    let _ = e;
154
0
                }
155
156
0
                if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
157
0
                    // Data operation succeeded in memory
158
0
                    let _ = e;
159
0
                }
160
0
            }
161
4
        }
162
163
4
        Ok(false)
164
4
    }
165
166
    /// Get access to hash table (for TTL cleaner)
167
0
    pub(crate) fn get_hash_table(&self) -> &HashMap<Vec<u8>, Arc<Record>, RandomState> {
168
0
        &self.hash_table
169
0
    }
170
171
    /// Remove from tree (for TTL cleaner)
172
0
    pub(crate) fn remove_from_tree(&self, key: &[u8]) {
173
0
        self.tree.remove(key);
174
0
    }
175
176
    /// Get write buffer (for TTL cleaner)
177
0
    pub(crate) fn get_write_buffer(&self) -> Option<&Arc<WriteBuffer>> {
178
0
        self.write_buffer.as_ref()
179
0
    }
180
}