Coverage Report

Created: 2025-09-19 09:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/core/store/atomic.rs
Line
Count
Source
1
use std::sync::atomic::Ordering;
2
use std::sync::Arc;
3
4
use crate::constants::Operation;
5
use crate::core::record::Record;
6
use crate::error::{FeoxError, Result};
7
8
use super::FeoxStore;
9
10
impl FeoxStore {
11
    /// Atomically increment a numeric counter.
12
    ///
13
    /// The value must be stored as an 8-byte little-endian i64. If the key doesn't exist,
14
    /// it will be created with the given delta value. If it exists, the value will be
15
    /// incremented atomically.
16
    ///
17
    /// # Value Format
18
    ///
19
    /// The value MUST be exactly 8 bytes representing a little-endian i64.
20
    /// Use `i64::to_le_bytes()` to create the initial value:
21
    /// ```rust,ignore
22
    /// let zero: i64 = 0;
23
    /// store.insert(b"counter", &zero.to_le_bytes())?;
24
    /// ```
25
    ///
26
    /// # Arguments
27
    ///
28
    /// * `key` - The key of the counter
29
    /// * `delta` - The amount to increment by (can be negative for decrement)
30
    /// * `timestamp` - Optional timestamp for conflict resolution
31
    ///
32
    /// # Returns
33
    ///
34
    /// Returns the new value after incrementing.
35
    ///
36
    /// # Errors
37
    ///
38
    /// * `InvalidOperation` - Existing value is not exactly 8 bytes (not a valid i64)
39
    /// * `OlderTimestamp` - Timestamp is not newer than existing record
40
    ///
41
    /// # Example
42
    ///
43
    /// ```rust
44
    /// # use feoxdb::FeoxStore;
45
    /// # fn main() -> feoxdb::Result<()> {
46
    /// # let store = FeoxStore::new(None)?;
47
    /// // Initialize counter with proper binary format
48
    /// let initial: i64 = 0;
49
    /// store.insert(b"visits", &initial.to_le_bytes())?;
50
    ///
51
    /// // Increment atomically
52
    /// let val = store.atomic_increment(b"visits", 1)?;
53
    /// assert_eq!(val, 1);
54
    ///
55
    /// // Increment by 5
56
    /// let val = store.atomic_increment(b"visits", 5)?;
57
    /// assert_eq!(val, 6);
58
    ///
59
    /// // Decrement by 2
60
    /// let val = store.atomic_increment(b"visits", -2)?;
61
    /// assert_eq!(val, 4);
62
    ///
63
    /// // Or create new counter directly (starts at delta value)
64
    /// let downloads = store.atomic_increment(b"downloads", 100)?;
65
    /// assert_eq!(downloads, 100);
66
    /// # Ok(())
67
    /// # }
68
    /// ```
69
1.10k
    pub fn atomic_increment(&self, key: &[u8], delta: i64) -> Result<i64> {
70
1.10k
        self.atomic_increment_with_timestamp_and_ttl(key, delta, None, 0)
71
1.10k
    }
72
73
    /// Atomically increment/decrement with explicit timestamp.
74
    ///
75
    /// This is the advanced version that allows manual timestamp control.
76
    /// Most users should use `atomic_increment()` instead.
77
    ///
78
    /// # Arguments
79
    ///
80
    /// * `key` - The key to increment/decrement
81
    /// * `delta` - Amount to add (negative to decrement)
82
    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
83
    ///
84
    /// # Errors
85
    ///
86
    /// * `OlderTimestamp` - Timestamp is not newer than existing record
87
0
    pub fn atomic_increment_with_timestamp(
88
0
        &self,
89
0
        key: &[u8],
90
0
        delta: i64,
91
0
        timestamp: Option<u64>,
92
0
    ) -> Result<i64> {
93
0
        self.atomic_increment_with_timestamp_and_ttl(key, delta, timestamp, 0)
94
0
    }
95
96
    /// Atomically increment/decrement with TTL support.
97
    ///
98
    /// # Arguments
99
    ///
100
    /// * `key` - The key to increment/decrement
101
    /// * `delta` - Amount to add (negative to decrement)
102
    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
103
    ///
104
    /// # Errors
105
    ///
106
    /// * `InvalidOperation` - Value is not a valid i64
107
0
    pub fn atomic_increment_with_ttl(
108
0
        &self,
109
0
        key: &[u8],
110
0
        delta: i64,
111
0
        ttl_seconds: u64,
112
0
    ) -> Result<i64> {
113
0
        self.atomic_increment_with_timestamp_and_ttl(key, delta, None, ttl_seconds)
114
0
    }
115
116
    /// Atomically increment/decrement with explicit timestamp and TTL.
117
    ///
118
    /// # Arguments
119
    ///
120
    /// * `key` - The key to increment/decrement
121
    /// * `delta` - Amount to add (negative to decrement)
122
    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
123
    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
124
    ///
125
    /// # Errors
126
    ///
127
    /// * `OlderTimestamp` - Timestamp is not newer than existing record
128
1.10k
    pub fn atomic_increment_with_timestamp_and_ttl(
129
1.10k
        &self,
130
1.10k
        key: &[u8],
131
1.10k
        delta: i64,
132
1.10k
        timestamp: Option<u64>,
133
1.10k
        ttl_seconds: u64,
134
1.10k
    ) -> Result<i64> {
135
1.10k
        self.validate_key(key)
?0
;
136
137
1.10k
        let key_vec = key.to_vec();
138
139
1.10k
        let 
result1.10k
= match self.hash_table.entry(key_vec.clone()) {
140
1.10k
            scc::hash_map::Entry::Occupied(mut entry) => {
141
1.10k
                let old_record = entry.get();
142
143
                // Get timestamp inside the critical section to ensure it's always newer
144
1.10k
                let timestamp = match timestamp {
145
1.10k
                    Some(0) | None => self.get_timestamp(),
146
0
                    Some(ts) => ts,
147
                };
148
149
                // Check if timestamp is valid
150
1.10k
                if timestamp < old_record.timestamp {
151
0
                    return Err(FeoxError::OlderTimestamp);
152
1.10k
                }
153
154
                // Load value from memory or disk
155
1.10k
                let value = if let Some(
val1.10k
) = old_record.get_value() {
156
1.10k
                    val.to_vec()
157
                } else {
158
                    // Try loading from disk if not in memory
159
1
                    self.load_value_from_disk(old_record)
?0
160
                };
161
162
1.10k
                let 
current_val1.10k
= if value.len() == 8 {
163
1.10k
                    let bytes = value
164
1.10k
                        .get(..8)
165
1.10k
                        .and_then(|slice| slice.try_into().ok())
166
1.10k
                        .ok_or(FeoxError::InvalidNumericValue)
?0
;
167
1.10k
                    i64::from_le_bytes(bytes)
168
                } else {
169
1
                    return Err(FeoxError::InvalidOperation);
170
                };
171
172
1.10k
                let new_val = current_val.saturating_add(delta);
173
1.10k
                let new_value = new_val.to_le_bytes().to_vec();
174
175
                // Create new record with TTL if specified
176
1.10k
                let new_record = if ttl_seconds > 0 {
177
0
                    let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
178
0
                    Arc::new(Record::new_with_timestamp_ttl(
179
0
                        old_record.key.clone(),
180
0
                        new_value,
181
0
                        timestamp,
182
0
                        ttl_expiry,
183
                    ))
184
                } else {
185
1.10k
                    Arc::new(Record::new(old_record.key.clone(), new_value, timestamp))
186
                };
187
188
1.10k
                let old_value_len = old_record.value_len;
189
1.10k
                let old_size = old_record.calculate_size();
190
1.10k
                let new_size = self.calculate_record_size(old_record.key.len(), 8);
191
1.10k
                let old_record_arc = Arc::clone(old_record);
192
193
                // Atomically update the entry
194
1.10k
                entry.insert(Arc::clone(&new_record));
195
196
                // Update skip list as well
197
1.10k
                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
198
199
                // Update memory usage
200
1.10k
                if new_size > old_size {
201
0
                    self.stats
202
0
                        .memory_usage
203
0
                        .fetch_add(new_size - old_size, Ordering::AcqRel);
204
1.10k
                } else {
205
1.10k
                    self.stats
206
1.10k
                        .memory_usage
207
1.10k
                        .fetch_sub(old_size - new_size, Ordering::AcqRel);
208
1.10k
                }
209
210
                // Only do cache and persistence operations if not in memory-only mode
211
1.10k
                if !self.memory_only {
212
101
                    if self.enable_caching {
213
101
                        if let Some(ref cache) = self.cache {
214
101
                            cache.remove(&key_vec);
215
101
                        
}0
216
0
                    }
217
218
101
                    if let Some(ref wb) = self.write_buffer {
219
0
                        if let Err(e) =
220
101
                            wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
221
0
                        {
222
0
                            // Atomic operation succeeded in memory
223
0
                            let _ = e;
224
101
                        }
225
226
0
                        if let Err(e) =
227
101
                            wb.add_write(Operation::Delete, old_record_arc, old_value_len)
228
0
                        {
229
0
                            // Atomic operation succeeded in memory
230
0
                            let _ = e;
231
101
                        }
232
0
                    }
233
1.00k
                }
234
235
1.10k
                Ok(new_val)
236
            }
237
1
            scc::hash_map::Entry::Vacant(entry) => {
238
                // Key doesn't exist, create it with initial value
239
                // Get timestamp inside the critical section
240
1
                let timestamp = match timestamp {
241
1
                    Some(0) | None => self.get_timestamp(),
242
0
                    Some(ts) => ts,
243
                };
244
245
1
                let initial_val = delta;
246
1
                let value = initial_val.to_le_bytes().to_vec();
247
248
                // Create new record with TTL if specified
249
1
                let new_record = if ttl_seconds > 0 {
250
0
                    let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
251
0
                    Arc::new(Record::new_with_timestamp_ttl(
252
0
                        key_vec.clone(),
253
0
                        value,
254
0
                        timestamp,
255
0
                        ttl_expiry,
256
                    ))
257
                } else {
258
1
                    Arc::new(Record::new(key_vec.clone(), value, timestamp))
259
                };
260
261
1
                let _ = entry.insert_entry(Arc::clone(&new_record));
262
263
                // Update skip list
264
1
                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
265
266
                // Update statistics
267
1
                self.stats.record_count.fetch_add(1, Ordering::AcqRel);
268
1
                let record_size = self.calculate_record_size(key.len(), 8);
269
1
                self.stats
270
1
                    .memory_usage
271
1
                    .fetch_add(record_size, Ordering::AcqRel);
272
273
                // Handle persistence if needed
274
1
                if !self.memory_only {
275
0
                    if let Some(ref wb) = self.write_buffer {
276
0
                        if let Err(e) = wb.add_write(Operation::Insert, Arc::clone(&new_record), 0)
277
0
                        {
278
0
                            // Operation succeeded in memory
279
0
                            let _ = e;
280
0
                        }
281
0
                    }
282
1
                }
283
284
1
                Ok(initial_val)
285
            }
286
        };
287
288
1.10k
        result
289
1.10k
    }
290
291
    /// Atomically compare and swap a value.
292
    ///
293
    /// Compares the current value of a key with an expected value, and if they match,
294
    /// atomically replaces it with a new value. This operation is atomic within the
295
    /// HashMap shard, preventing race conditions.
296
    ///
297
    /// # Arguments
298
    ///
299
    /// * `key` - The key to check and potentially update
300
    /// * `expected` - The expected current value
301
    /// * `new_value` - The new value to set if comparison succeeds
302
    ///
303
    /// # Returns
304
    ///
305
    /// Returns `Ok(true)` if the swap succeeded (current value matched expected).
306
    /// Returns `Ok(false)` if the current value didn't match or key doesn't exist.
307
    ///
308
    /// # Errors
309
    ///
310
    /// * `InvalidKeySize` - Key is invalid
311
    /// * `InvalidValueSize` - New value is too large
312
    /// * `OutOfMemory` - Memory limit exceeded
313
    /// * `IoError` - Failed to read value from disk
314
    ///
315
    /// # Example
316
    ///
317
    /// ```rust
318
    /// # use feoxdb::FeoxStore;
319
    /// # fn main() -> feoxdb::Result<()> {
320
    /// # let store = FeoxStore::new(None)?;
321
    /// store.insert(b"config", b"v1")?;
322
    ///
323
    /// // Successful CAS - value matches
324
    /// let swapped = store.compare_and_swap(b"config", b"v1", b"v2")?;
325
    /// assert_eq!(swapped, true);
326
    ///
327
    /// // Failed CAS - value doesn't match
328
    /// let swapped = store.compare_and_swap(b"config", b"v1", b"v3")?;
329
    /// assert_eq!(swapped, false); // Value is now "v2", not "v1"
330
    ///
331
    /// // CAS on non-existent key
332
    /// let swapped = store.compare_and_swap(b"missing", b"any", b"new")?;
333
    /// assert_eq!(swapped, false);
334
    /// # Ok(())
335
    /// # }
336
    /// ```
337
3.28k
    pub fn compare_and_swap(&self, key: &[u8], expected: &[u8], new_value: &[u8]) -> Result<bool> {
338
3.28k
        self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, 0)
339
3.28k
    }
340
341
    /// Compare and swap with explicit timestamp.
342
    ///
343
    /// This is the advanced version that allows manual timestamp control for
344
    /// conflict resolution. Most users should use `compare_and_swap()` instead.
345
    ///
346
    /// # Arguments
347
    ///
348
    /// * `key` - The key to check and potentially update
349
    /// * `expected` - The expected current value
350
    /// * `new_value` - The new value to set if comparison succeeds
351
    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
352
    ///
353
    /// # Errors
354
    ///
355
    /// * `OlderTimestamp` - Timestamp is not newer than existing record
356
1
    pub fn compare_and_swap_with_timestamp(
357
1
        &self,
358
1
        key: &[u8],
359
1
        expected: &[u8],
360
1
        new_value: &[u8],
361
1
        timestamp: Option<u64>,
362
1
    ) -> Result<bool> {
363
1
        self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, timestamp, 0)
364
1
    }
365
366
    /// Compare and swap with TTL support.
367
    ///
368
    /// # Arguments
369
    ///
370
    /// * `key` - The key to check and potentially update
371
    /// * `expected` - The expected current value
372
    /// * `new_value` - The new value to set if comparison succeeds
373
    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
374
    ///
375
    /// # Errors
376
    ///
377
    /// * `InvalidKeySize` - Key is invalid
378
    /// * `InvalidValueSize` - New value is too large
379
0
    pub fn compare_and_swap_with_ttl(
380
0
        &self,
381
0
        key: &[u8],
382
0
        expected: &[u8],
383
0
        new_value: &[u8],
384
0
        ttl_seconds: u64,
385
0
    ) -> Result<bool> {
386
0
        self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, ttl_seconds)
387
0
    }
388
389
    /// Compare and swap with explicit timestamp and TTL.
390
    ///
391
    /// # Arguments
392
    ///
393
    /// * `key` - The key to check and potentially update
394
    /// * `expected` - The expected current value
395
    /// * `new_value` - The new value to set if comparison succeeds
396
    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
397
    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
398
    ///
399
    /// # Errors
400
    ///
401
    /// * `OlderTimestamp` - Timestamp is not newer than existing record
402
3.28k
    pub fn compare_and_swap_with_timestamp_and_ttl(
403
3.28k
        &self,
404
3.28k
        key: &[u8],
405
3.28k
        expected: &[u8],
406
3.28k
        new_value: &[u8],
407
3.28k
        timestamp: Option<u64>,
408
3.28k
        ttl_seconds: u64,
409
3.28k
    ) -> Result<bool> {
410
3.28k
        let start = std::time::Instant::now();
411
3.28k
        self.validate_key_value(key, new_value)
?0
;
412
3.28k
        let key_vec = key.to_vec();
413
414
        // Phase 1: Check value and save record reference for version tracking
415
2.21k
        let initial_record = {
416
3.28k
            let 
entry3.28k
= match self.hash_table.read(&key_vec, |_, v|
v3.28k
.
clone3.28k
()) {
417
3.28k
                Some(e) => e,
418
1
                None => return Ok(false), // Key doesn't exist
419
            };
420
421
3.28k
            let record_arc = entry;
422
423
            // Check if value matches expected
424
3.28k
            let value_matches = if let Some(val) = record_arc.get_value() {
425
                // Fast path: value in memory
426
3.28k
                val.as_ref() == expected
427
            } else {
428
                // Need disk I/O
429
430
0
                let disk_value = self.load_value_from_disk(&record_arc)?;
431
0
                disk_value == expected
432
            };
433
434
3.28k
            if !value_matches {
435
1.06k
                return Ok(false); // Value doesn't match expected
436
2.21k
            }
437
438
            // Return the Arc pointer itself as our version identifier
439
            // NOTE: We can't use the stored timestamp for verification here because
440
            // SystemTime::now() resolution is 1us, which is too coarse for CAS operations.
441
2.21k
            record_arc
442
        };
443
444
        // Phase 2: Acquire write lock and verify record hasn't changed
445
2.21k
        match self.hash_table.entry(key_vec.clone()) {
446
2.21k
            scc::hash_map::Entry::Occupied(mut entry) => {
447
2.21k
                let old_record = entry.get();
448
449
                // Check if the record is still the same one we read earlier
450
2.21k
                if !Arc::ptr_eq(old_record, &initial_record) {
451
                    // Record was modified between our check and acquiring lock
452
1.10k
                    return Ok(false);
453
1.10k
                }
454
455
1.10k
                let timestamp = match timestamp {
456
1.10k
                    Some(0) | None => self.get_timestamp(),
457
1
                    Some(ts) => ts,
458
                };
459
460
1.10k
                if timestamp < old_record.timestamp {
461
1
                    return Err(FeoxError::OlderTimestamp);
462
1.10k
                }
463
464
1.10k
                let old_size = old_record.calculate_size();
465
1.10k
                let new_size = self.calculate_record_size(key.len(), new_value.len());
466
1.10k
                let old_value_len = old_record.value_len;
467
1.10k
                let old_record_arc = Arc::clone(old_record);
468
469
                // Pre-check memory limit
470
1.10k
                if new_size > old_size && 
!5
self5
.
check_memory_limit5
(new_size - old_size) {
471
0
                    return Err(FeoxError::OutOfMemory);
472
1.10k
                }
473
474
                // Create new record with TTL if specified
475
1.10k
                let new_record = if ttl_seconds > 0 {
476
0
                    let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
477
0
                    Arc::new(Record::new_with_timestamp_ttl(
478
0
                        key.to_vec(),
479
0
                        new_value.to_vec(),
480
0
                        timestamp,
481
0
                        ttl_expiry,
482
                    ))
483
                } else {
484
1.10k
                    Arc::new(Record::new(key.to_vec(), new_value.to_vec(), timestamp))
485
                };
486
487
1.10k
                entry.insert(Arc::clone(&new_record));
488
489
1.10k
                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
490
491
1.10k
                if new_size > old_size {
492
5
                    self.stats
493
5
                        .memory_usage
494
5
                        .fetch_add(new_size - old_size, Ordering::AcqRel);
495
1.10k
                } else {
496
1.10k
                    self.stats
497
1.10k
                        .memory_usage
498
1.10k
                        .fetch_sub(old_size - new_size, Ordering::AcqRel);
499
1.10k
                }
500
501
1.10k
                self.stats
502
1.10k
                    .record_insert(start.elapsed().as_nanos() as u64, true);
503
504
1.10k
                if !self.memory_only {
505
0
                    if self.enable_caching {
506
0
                        if let Some(ref cache) = self.cache {
507
0
                            cache.remove(&key_vec);
508
0
                        }
509
0
                    }
510
511
0
                    if let Some(ref wb) = self.write_buffer {
512
0
                        let _ = wb.add_write(Operation::Update, new_record, old_value_len);
513
0
                        let _ = wb.add_write(Operation::Delete, old_record_arc, old_value_len);
514
0
                    }
515
1.10k
                }
516
517
1.10k
                Ok(true)
518
            }
519
0
            scc::hash_map::Entry::Vacant(_) => Ok(false),
520
        }
521
3.28k
    }
522
}