feoxdb/core/store/
atomic.rs

1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3
4use crate::constants::Operation;
5use crate::core::record::Record;
6use crate::error::{FeoxError, Result};
7
8use super::FeoxStore;
9
10impl FeoxStore {
11    /// Atomically increment a numeric counter.
12    ///
13    /// The value must be stored as an 8-byte little-endian i64. If the key doesn't exist,
14    /// it will be created with the given delta value. If it exists, the value will be
15    /// incremented atomically.
16    ///
17    /// # Value Format
18    ///
19    /// The value MUST be exactly 8 bytes representing a little-endian i64.
20    /// Use `i64::to_le_bytes()` to create the initial value:
21    /// ```rust,ignore
22    /// let zero: i64 = 0;
23    /// store.insert(b"counter", &zero.to_le_bytes())?;
24    /// ```
25    ///
26    /// # Arguments
27    ///
28    /// * `key` - The key of the counter
29    /// * `delta` - The amount to increment by (can be negative for decrement)
30    /// * `timestamp` - Optional timestamp for conflict resolution
31    ///
32    /// # Returns
33    ///
34    /// Returns the new value after incrementing.
35    ///
36    /// # Errors
37    ///
38    /// * `InvalidOperation` - Existing value is not exactly 8 bytes (not a valid i64)
39    /// * `OlderTimestamp` - Timestamp is not newer than existing record
40    ///
41    /// # Example
42    ///
43    /// ```rust
44    /// # use feoxdb::FeoxStore;
45    /// # fn main() -> feoxdb::Result<()> {
46    /// # let store = FeoxStore::new(None)?;
47    /// // Initialize counter with proper binary format
48    /// let initial: i64 = 0;
49    /// store.insert(b"visits", &initial.to_le_bytes())?;
50    ///
51    /// // Increment atomically
52    /// let val = store.atomic_increment(b"visits", 1)?;
53    /// assert_eq!(val, 1);
54    ///
55    /// // Increment by 5
56    /// let val = store.atomic_increment(b"visits", 5)?;
57    /// assert_eq!(val, 6);
58    ///
59    /// // Decrement by 2
60    /// let val = store.atomic_increment(b"visits", -2)?;
61    /// assert_eq!(val, 4);
62    ///
63    /// // Or create new counter directly (starts at delta value)
64    /// let downloads = store.atomic_increment(b"downloads", 100)?;
65    /// assert_eq!(downloads, 100);
66    /// # Ok(())
67    /// # }
68    /// ```
69    pub fn atomic_increment(&self, key: &[u8], delta: i64) -> Result<i64> {
70        self.atomic_increment_with_timestamp_and_ttl(key, delta, None, 0)
71    }
72
73    /// Atomically increment/decrement with explicit timestamp.
74    ///
75    /// This is the advanced version that allows manual timestamp control.
76    /// Most users should use `atomic_increment()` instead.
77    ///
78    /// # Arguments
79    ///
80    /// * `key` - The key to increment/decrement
81    /// * `delta` - Amount to add (negative to decrement)
82    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
83    ///
84    /// # Errors
85    ///
86    /// * `OlderTimestamp` - Timestamp is not newer than existing record
87    pub fn atomic_increment_with_timestamp(
88        &self,
89        key: &[u8],
90        delta: i64,
91        timestamp: Option<u64>,
92    ) -> Result<i64> {
93        self.atomic_increment_with_timestamp_and_ttl(key, delta, timestamp, 0)
94    }
95
96    /// Atomically increment/decrement with TTL support.
97    ///
98    /// # Arguments
99    ///
100    /// * `key` - The key to increment/decrement
101    /// * `delta` - Amount to add (negative to decrement)
102    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
103    ///
104    /// # Errors
105    ///
106    /// * `InvalidOperation` - Value is not a valid i64
107    pub fn atomic_increment_with_ttl(
108        &self,
109        key: &[u8],
110        delta: i64,
111        ttl_seconds: u64,
112    ) -> Result<i64> {
113        self.atomic_increment_with_timestamp_and_ttl(key, delta, None, ttl_seconds)
114    }
115
116    /// Atomically increment/decrement with explicit timestamp and TTL.
117    ///
118    /// # Arguments
119    ///
120    /// * `key` - The key to increment/decrement
121    /// * `delta` - Amount to add (negative to decrement)
122    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
123    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
124    ///
125    /// # Errors
126    ///
127    /// * `OlderTimestamp` - Timestamp is not newer than existing record
128    pub fn atomic_increment_with_timestamp_and_ttl(
129        &self,
130        key: &[u8],
131        delta: i64,
132        timestamp: Option<u64>,
133        ttl_seconds: u64,
134    ) -> Result<i64> {
135        self.validate_key(key)?;
136
137        let key_vec = key.to_vec();
138
139        let result = match self.hash_table.entry(key_vec.clone()) {
140            scc::hash_map::Entry::Occupied(mut entry) => {
141                let old_record = entry.get();
142
143                // Get timestamp inside the critical section to ensure it's always newer
144                let timestamp = match timestamp {
145                    Some(0) | None => self.get_timestamp(),
146                    Some(ts) => ts,
147                };
148
149                // Check if timestamp is valid
150                if timestamp < old_record.timestamp {
151                    return Err(FeoxError::OlderTimestamp);
152                }
153
154                // Load value from memory or disk
155                let value = if let Some(val) = old_record.get_value() {
156                    val.to_vec()
157                } else {
158                    // Try loading from disk if not in memory
159                    self.load_value_from_disk(old_record)?
160                };
161
162                let current_val = if value.len() == 8 {
163                    let bytes = value
164                        .get(..8)
165                        .and_then(|slice| slice.try_into().ok())
166                        .ok_or(FeoxError::InvalidNumericValue)?;
167                    i64::from_le_bytes(bytes)
168                } else {
169                    return Err(FeoxError::InvalidOperation);
170                };
171
172                let new_val = current_val.saturating_add(delta);
173                let new_value = new_val.to_le_bytes().to_vec();
174
175                // Create new record with TTL if specified
176                let new_record = if ttl_seconds > 0 {
177                    let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
178                    Arc::new(Record::new_with_timestamp_ttl(
179                        old_record.key.clone(),
180                        new_value,
181                        timestamp,
182                        ttl_expiry,
183                    ))
184                } else {
185                    Arc::new(Record::new(old_record.key.clone(), new_value, timestamp))
186                };
187
188                let old_value_len = old_record.value_len;
189                let old_size = old_record.calculate_size();
190                let new_size = self.calculate_record_size(old_record.key.len(), 8);
191                let old_record_arc = Arc::clone(old_record);
192
193                // Atomically update the entry
194                entry.insert(Arc::clone(&new_record));
195
196                // Update skip list as well
197                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
198
199                // Update memory usage
200                if new_size > old_size {
201                    self.stats
202                        .memory_usage
203                        .fetch_add(new_size - old_size, Ordering::AcqRel);
204                } else {
205                    self.stats
206                        .memory_usage
207                        .fetch_sub(old_size - new_size, Ordering::AcqRel);
208                }
209
210                // Only do cache and persistence operations if not in memory-only mode
211                if !self.memory_only {
212                    if self.enable_caching {
213                        if let Some(ref cache) = self.cache {
214                            cache.remove(&key_vec);
215                        }
216                    }
217
218                    if let Some(ref wb) = self.write_buffer {
219                        if let Err(e) =
220                            wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
221                        {
222                            // Atomic operation succeeded in memory
223                            let _ = e;
224                        }
225
226                        if let Err(e) =
227                            wb.add_write(Operation::Delete, old_record_arc, old_value_len)
228                        {
229                            // Atomic operation succeeded in memory
230                            let _ = e;
231                        }
232                    }
233                }
234
235                Ok(new_val)
236            }
237            scc::hash_map::Entry::Vacant(entry) => {
238                // Key doesn't exist, create it with initial value
239                // Get timestamp inside the critical section
240                let timestamp = match timestamp {
241                    Some(0) | None => self.get_timestamp(),
242                    Some(ts) => ts,
243                };
244
245                let initial_val = delta;
246                let value = initial_val.to_le_bytes().to_vec();
247
248                // Create new record with TTL if specified
249                let new_record = if ttl_seconds > 0 {
250                    let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
251                    Arc::new(Record::new_with_timestamp_ttl(
252                        key_vec.clone(),
253                        value,
254                        timestamp,
255                        ttl_expiry,
256                    ))
257                } else {
258                    Arc::new(Record::new(key_vec.clone(), value, timestamp))
259                };
260
261                let _ = entry.insert_entry(Arc::clone(&new_record));
262
263                // Update skip list
264                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
265
266                // Update statistics
267                self.stats.record_count.fetch_add(1, Ordering::AcqRel);
268                let record_size = self.calculate_record_size(key.len(), 8);
269                self.stats
270                    .memory_usage
271                    .fetch_add(record_size, Ordering::AcqRel);
272
273                // Handle persistence if needed
274                if !self.memory_only {
275                    if let Some(ref wb) = self.write_buffer {
276                        if let Err(e) = wb.add_write(Operation::Insert, Arc::clone(&new_record), 0)
277                        {
278                            // Operation succeeded in memory
279                            let _ = e;
280                        }
281                    }
282                }
283
284                Ok(initial_val)
285            }
286        };
287
288        result
289    }
290
291    /// Atomically compare and swap a value.
292    ///
293    /// Compares the current value of a key with an expected value, and if they match,
294    /// atomically replaces it with a new value. This operation is atomic within the
295    /// HashMap shard, preventing race conditions.
296    ///
297    /// # Arguments
298    ///
299    /// * `key` - The key to check and potentially update
300    /// * `expected` - The expected current value
301    /// * `new_value` - The new value to set if comparison succeeds
302    ///
303    /// # Returns
304    ///
305    /// Returns `Ok(true)` if the swap succeeded (current value matched expected).
306    /// Returns `Ok(false)` if the current value didn't match or key doesn't exist.
307    ///
308    /// # Errors
309    ///
310    /// * `InvalidKeySize` - Key is invalid
311    /// * `InvalidValueSize` - New value is too large
312    /// * `OutOfMemory` - Memory limit exceeded
313    /// * `IoError` - Failed to read value from disk
314    ///
315    /// # Example
316    ///
317    /// ```rust
318    /// # use feoxdb::FeoxStore;
319    /// # fn main() -> feoxdb::Result<()> {
320    /// # let store = FeoxStore::new(None)?;
321    /// store.insert(b"config", b"v1")?;
322    ///
323    /// // Successful CAS - value matches
324    /// let swapped = store.compare_and_swap(b"config", b"v1", b"v2")?;
325    /// assert_eq!(swapped, true);
326    ///
327    /// // Failed CAS - value doesn't match
328    /// let swapped = store.compare_and_swap(b"config", b"v1", b"v3")?;
329    /// assert_eq!(swapped, false); // Value is now "v2", not "v1"
330    ///
331    /// // CAS on non-existent key
332    /// let swapped = store.compare_and_swap(b"missing", b"any", b"new")?;
333    /// assert_eq!(swapped, false);
334    /// # Ok(())
335    /// # }
336    /// ```
337    pub fn compare_and_swap(&self, key: &[u8], expected: &[u8], new_value: &[u8]) -> Result<bool> {
338        self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, 0)
339    }
340
341    /// Compare and swap with explicit timestamp.
342    ///
343    /// This is the advanced version that allows manual timestamp control for
344    /// conflict resolution. Most users should use `compare_and_swap()` instead.
345    ///
346    /// # Arguments
347    ///
348    /// * `key` - The key to check and potentially update
349    /// * `expected` - The expected current value
350    /// * `new_value` - The new value to set if comparison succeeds
351    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
352    ///
353    /// # Errors
354    ///
355    /// * `OlderTimestamp` - Timestamp is not newer than existing record
356    pub fn compare_and_swap_with_timestamp(
357        &self,
358        key: &[u8],
359        expected: &[u8],
360        new_value: &[u8],
361        timestamp: Option<u64>,
362    ) -> Result<bool> {
363        self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, timestamp, 0)
364    }
365
366    /// Compare and swap with TTL support.
367    ///
368    /// # Arguments
369    ///
370    /// * `key` - The key to check and potentially update
371    /// * `expected` - The expected current value
372    /// * `new_value` - The new value to set if comparison succeeds
373    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
374    ///
375    /// # Errors
376    ///
377    /// * `InvalidKeySize` - Key is invalid
378    /// * `InvalidValueSize` - New value is too large
379    pub fn compare_and_swap_with_ttl(
380        &self,
381        key: &[u8],
382        expected: &[u8],
383        new_value: &[u8],
384        ttl_seconds: u64,
385    ) -> Result<bool> {
386        self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, ttl_seconds)
387    }
388
389    /// Compare and swap with explicit timestamp and TTL.
390    ///
391    /// # Arguments
392    ///
393    /// * `key` - The key to check and potentially update
394    /// * `expected` - The expected current value
395    /// * `new_value` - The new value to set if comparison succeeds
396    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
397    /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
398    ///
399    /// # Errors
400    ///
401    /// * `OlderTimestamp` - Timestamp is not newer than existing record
402    pub fn compare_and_swap_with_timestamp_and_ttl(
403        &self,
404        key: &[u8],
405        expected: &[u8],
406        new_value: &[u8],
407        timestamp: Option<u64>,
408        ttl_seconds: u64,
409    ) -> Result<bool> {
410        let start = std::time::Instant::now();
411        self.validate_key_value(key, new_value)?;
412        let key_vec = key.to_vec();
413
414        // Phase 1: Check value and save record reference for version tracking
415        let initial_record = {
416            let entry = match self.hash_table.read(&key_vec, |_, v| v.clone()) {
417                Some(e) => e,
418                None => return Ok(false), // Key doesn't exist
419            };
420
421            let record_arc = entry;
422
423            // Check if value matches expected
424            let value_matches = if let Some(val) = record_arc.get_value() {
425                // Fast path: value in memory
426                val.as_ref() == expected
427            } else {
428                // Need disk I/O
429
430                let disk_value = self.load_value_from_disk(&record_arc)?;
431                disk_value == expected
432            };
433
434            if !value_matches {
435                return Ok(false); // Value doesn't match expected
436            }
437
438            // Return the Arc pointer itself as our version identifier
439            // NOTE: We can't use the stored timestamp for verification here because
440            // SystemTime::now() resolution is 1us, which is too coarse for CAS operations.
441            record_arc
442        };
443
444        // Phase 2: Acquire write lock and verify record hasn't changed
445        match self.hash_table.entry(key_vec.clone()) {
446            scc::hash_map::Entry::Occupied(mut entry) => {
447                let old_record = entry.get();
448
449                // Check if the record is still the same one we read earlier
450                if !Arc::ptr_eq(old_record, &initial_record) {
451                    // Record was modified between our check and acquiring lock
452                    return Ok(false);
453                }
454
455                let timestamp = match timestamp {
456                    Some(0) | None => self.get_timestamp(),
457                    Some(ts) => ts,
458                };
459
460                if timestamp < old_record.timestamp {
461                    return Err(FeoxError::OlderTimestamp);
462                }
463
464                let old_size = old_record.calculate_size();
465                let new_size = self.calculate_record_size(key.len(), new_value.len());
466                let old_value_len = old_record.value_len;
467                let old_record_arc = Arc::clone(old_record);
468
469                // Pre-check memory limit
470                if new_size > old_size && !self.check_memory_limit(new_size - old_size) {
471                    return Err(FeoxError::OutOfMemory);
472                }
473
474                // Create new record with TTL if specified
475                let new_record = if ttl_seconds > 0 {
476                    let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
477                    Arc::new(Record::new_with_timestamp_ttl(
478                        key.to_vec(),
479                        new_value.to_vec(),
480                        timestamp,
481                        ttl_expiry,
482                    ))
483                } else {
484                    Arc::new(Record::new(key.to_vec(), new_value.to_vec(), timestamp))
485                };
486
487                entry.insert(Arc::clone(&new_record));
488
489                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
490
491                if new_size > old_size {
492                    self.stats
493                        .memory_usage
494                        .fetch_add(new_size - old_size, Ordering::AcqRel);
495                } else {
496                    self.stats
497                        .memory_usage
498                        .fetch_sub(old_size - new_size, Ordering::AcqRel);
499                }
500
501                self.stats
502                    .record_insert(start.elapsed().as_nanos() as u64, true);
503
504                if !self.memory_only {
505                    if self.enable_caching {
506                        if let Some(ref cache) = self.cache {
507                            cache.remove(&key_vec);
508                        }
509                    }
510
511                    if let Some(ref wb) = self.write_buffer {
512                        let _ = wb.add_write(Operation::Update, new_record, old_value_len);
513                        let _ = wb.add_write(Operation::Delete, old_record_arc, old_value_len);
514                    }
515                }
516
517                Ok(true)
518            }
519            scc::hash_map::Entry::Vacant(_) => Ok(false),
520        }
521    }
522}