feoxdb/core/store/atomic.rs
1use std::sync::atomic::Ordering;
2use std::sync::Arc;
3
4use crate::constants::Operation;
5use crate::core::record::Record;
6use crate::error::{FeoxError, Result};
7
8use super::FeoxStore;
9
10impl FeoxStore {
11 /// Atomically increment a numeric counter.
12 ///
13 /// The value must be stored as an 8-byte little-endian i64. If the key doesn't exist,
14 /// it will be created with the given delta value. If it exists, the value will be
15 /// incremented atomically.
16 ///
17 /// # Value Format
18 ///
19 /// The value MUST be exactly 8 bytes representing a little-endian i64.
20 /// Use `i64::to_le_bytes()` to create the initial value:
21 /// ```rust,ignore
22 /// let zero: i64 = 0;
23 /// store.insert(b"counter", &zero.to_le_bytes())?;
24 /// ```
25 ///
26 /// # Arguments
27 ///
28 /// * `key` - The key of the counter
29 /// * `delta` - The amount to increment by (can be negative for decrement)
30 /// * `timestamp` - Optional timestamp for conflict resolution
31 ///
32 /// # Returns
33 ///
34 /// Returns the new value after incrementing.
35 ///
36 /// # Errors
37 ///
38 /// * `InvalidOperation` - Existing value is not exactly 8 bytes (not a valid i64)
39 /// * `OlderTimestamp` - Timestamp is not newer than existing record
40 ///
41 /// # Example
42 ///
43 /// ```rust
44 /// # use feoxdb::FeoxStore;
45 /// # fn main() -> feoxdb::Result<()> {
46 /// # let store = FeoxStore::new(None)?;
47 /// // Initialize counter with proper binary format
48 /// let initial: i64 = 0;
49 /// store.insert(b"visits", &initial.to_le_bytes())?;
50 ///
51 /// // Increment atomically
52 /// let val = store.atomic_increment(b"visits", 1)?;
53 /// assert_eq!(val, 1);
54 ///
55 /// // Increment by 5
56 /// let val = store.atomic_increment(b"visits", 5)?;
57 /// assert_eq!(val, 6);
58 ///
59 /// // Decrement by 2
60 /// let val = store.atomic_increment(b"visits", -2)?;
61 /// assert_eq!(val, 4);
62 ///
63 /// // Or create new counter directly (starts at delta value)
64 /// let downloads = store.atomic_increment(b"downloads", 100)?;
65 /// assert_eq!(downloads, 100);
66 /// # Ok(())
67 /// # }
68 /// ```
69 pub fn atomic_increment(&self, key: &[u8], delta: i64) -> Result<i64> {
70 self.atomic_increment_with_timestamp_and_ttl(key, delta, None, 0)
71 }
72
73 /// Atomically increment/decrement with explicit timestamp.
74 ///
75 /// This is the advanced version that allows manual timestamp control.
76 /// Most users should use `atomic_increment()` instead.
77 ///
78 /// # Arguments
79 ///
80 /// * `key` - The key to increment/decrement
81 /// * `delta` - Amount to add (negative to decrement)
82 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
83 ///
84 /// # Errors
85 ///
86 /// * `OlderTimestamp` - Timestamp is not newer than existing record
87 pub fn atomic_increment_with_timestamp(
88 &self,
89 key: &[u8],
90 delta: i64,
91 timestamp: Option<u64>,
92 ) -> Result<i64> {
93 self.atomic_increment_with_timestamp_and_ttl(key, delta, timestamp, 0)
94 }
95
96 /// Atomically increment/decrement with TTL support.
97 ///
98 /// # Arguments
99 ///
100 /// * `key` - The key to increment/decrement
101 /// * `delta` - Amount to add (negative to decrement)
102 /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
103 ///
104 /// # Errors
105 ///
106 /// * `InvalidOperation` - Value is not a valid i64
107 pub fn atomic_increment_with_ttl(
108 &self,
109 key: &[u8],
110 delta: i64,
111 ttl_seconds: u64,
112 ) -> Result<i64> {
113 self.atomic_increment_with_timestamp_and_ttl(key, delta, None, ttl_seconds)
114 }
115
116 /// Atomically increment/decrement with explicit timestamp and TTL.
117 ///
118 /// # Arguments
119 ///
120 /// * `key` - The key to increment/decrement
121 /// * `delta` - Amount to add (negative to decrement)
122 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
123 /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
124 ///
125 /// # Errors
126 ///
127 /// * `OlderTimestamp` - Timestamp is not newer than existing record
128 pub fn atomic_increment_with_timestamp_and_ttl(
129 &self,
130 key: &[u8],
131 delta: i64,
132 timestamp: Option<u64>,
133 ttl_seconds: u64,
134 ) -> Result<i64> {
135 self.validate_key(key)?;
136
137 let key_vec = key.to_vec();
138
139 let result = match self.hash_table.entry(key_vec.clone()) {
140 scc::hash_map::Entry::Occupied(mut entry) => {
141 let old_record = entry.get();
142
143 // Get timestamp inside the critical section to ensure it's always newer
144 let timestamp = match timestamp {
145 Some(0) | None => self.get_timestamp(),
146 Some(ts) => ts,
147 };
148
149 // Check if timestamp is valid
150 if timestamp < old_record.timestamp {
151 return Err(FeoxError::OlderTimestamp);
152 }
153
154 // Load value from memory or disk
155 let value = if let Some(val) = old_record.get_value() {
156 val.to_vec()
157 } else {
158 // Try loading from disk if not in memory
159 self.load_value_from_disk(old_record)?
160 };
161
162 let current_val = if value.len() == 8 {
163 let bytes = value
164 .get(..8)
165 .and_then(|slice| slice.try_into().ok())
166 .ok_or(FeoxError::InvalidNumericValue)?;
167 i64::from_le_bytes(bytes)
168 } else {
169 return Err(FeoxError::InvalidOperation);
170 };
171
172 let new_val = current_val.saturating_add(delta);
173 let new_value = new_val.to_le_bytes().to_vec();
174
175 // Create new record with TTL if specified
176 let new_record = if ttl_seconds > 0 {
177 let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
178 Arc::new(Record::new_with_timestamp_ttl(
179 old_record.key.clone(),
180 new_value,
181 timestamp,
182 ttl_expiry,
183 ))
184 } else {
185 Arc::new(Record::new(old_record.key.clone(), new_value, timestamp))
186 };
187
188 let old_value_len = old_record.value_len;
189 let old_size = old_record.calculate_size();
190 let new_size = self.calculate_record_size(old_record.key.len(), 8);
191 let old_record_arc = Arc::clone(old_record);
192
193 // Atomically update the entry
194 entry.insert(Arc::clone(&new_record));
195
196 // Update skip list as well
197 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
198
199 // Update memory usage
200 if new_size > old_size {
201 self.stats
202 .memory_usage
203 .fetch_add(new_size - old_size, Ordering::AcqRel);
204 } else {
205 self.stats
206 .memory_usage
207 .fetch_sub(old_size - new_size, Ordering::AcqRel);
208 }
209
210 // Only do cache and persistence operations if not in memory-only mode
211 if !self.memory_only {
212 if self.enable_caching {
213 if let Some(ref cache) = self.cache {
214 cache.remove(&key_vec);
215 }
216 }
217
218 if let Some(ref wb) = self.write_buffer {
219 if let Err(e) =
220 wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
221 {
222 // Atomic operation succeeded in memory
223 let _ = e;
224 }
225
226 if let Err(e) =
227 wb.add_write(Operation::Delete, old_record_arc, old_value_len)
228 {
229 // Atomic operation succeeded in memory
230 let _ = e;
231 }
232 }
233 }
234
235 Ok(new_val)
236 }
237 scc::hash_map::Entry::Vacant(entry) => {
238 // Key doesn't exist, create it with initial value
239 // Get timestamp inside the critical section
240 let timestamp = match timestamp {
241 Some(0) | None => self.get_timestamp(),
242 Some(ts) => ts,
243 };
244
245 let initial_val = delta;
246 let value = initial_val.to_le_bytes().to_vec();
247
248 // Create new record with TTL if specified
249 let new_record = if ttl_seconds > 0 {
250 let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
251 Arc::new(Record::new_with_timestamp_ttl(
252 key_vec.clone(),
253 value,
254 timestamp,
255 ttl_expiry,
256 ))
257 } else {
258 Arc::new(Record::new(key_vec.clone(), value, timestamp))
259 };
260
261 let _ = entry.insert_entry(Arc::clone(&new_record));
262
263 // Update skip list
264 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
265
266 // Update statistics
267 self.stats.record_count.fetch_add(1, Ordering::AcqRel);
268 let record_size = self.calculate_record_size(key.len(), 8);
269 self.stats
270 .memory_usage
271 .fetch_add(record_size, Ordering::AcqRel);
272
273 // Handle persistence if needed
274 if !self.memory_only {
275 if let Some(ref wb) = self.write_buffer {
276 if let Err(e) = wb.add_write(Operation::Insert, Arc::clone(&new_record), 0)
277 {
278 // Operation succeeded in memory
279 let _ = e;
280 }
281 }
282 }
283
284 Ok(initial_val)
285 }
286 };
287
288 result
289 }
290
291 /// Atomically compare and swap a value.
292 ///
293 /// Compares the current value of a key with an expected value, and if they match,
294 /// atomically replaces it with a new value. This operation is atomic within the
295 /// HashMap shard, preventing race conditions.
296 ///
297 /// # Arguments
298 ///
299 /// * `key` - The key to check and potentially update
300 /// * `expected` - The expected current value
301 /// * `new_value` - The new value to set if comparison succeeds
302 ///
303 /// # Returns
304 ///
305 /// Returns `Ok(true)` if the swap succeeded (current value matched expected).
306 /// Returns `Ok(false)` if the current value didn't match or key doesn't exist.
307 ///
308 /// # Errors
309 ///
310 /// * `InvalidKeySize` - Key is invalid
311 /// * `InvalidValueSize` - New value is too large
312 /// * `OutOfMemory` - Memory limit exceeded
313 /// * `IoError` - Failed to read value from disk
314 ///
315 /// # Example
316 ///
317 /// ```rust
318 /// # use feoxdb::FeoxStore;
319 /// # fn main() -> feoxdb::Result<()> {
320 /// # let store = FeoxStore::new(None)?;
321 /// store.insert(b"config", b"v1")?;
322 ///
323 /// // Successful CAS - value matches
324 /// let swapped = store.compare_and_swap(b"config", b"v1", b"v2")?;
325 /// assert_eq!(swapped, true);
326 ///
327 /// // Failed CAS - value doesn't match
328 /// let swapped = store.compare_and_swap(b"config", b"v1", b"v3")?;
329 /// assert_eq!(swapped, false); // Value is now "v2", not "v1"
330 ///
331 /// // CAS on non-existent key
332 /// let swapped = store.compare_and_swap(b"missing", b"any", b"new")?;
333 /// assert_eq!(swapped, false);
334 /// # Ok(())
335 /// # }
336 /// ```
337 pub fn compare_and_swap(&self, key: &[u8], expected: &[u8], new_value: &[u8]) -> Result<bool> {
338 self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, 0)
339 }
340
341 /// Compare and swap with explicit timestamp.
342 ///
343 /// This is the advanced version that allows manual timestamp control for
344 /// conflict resolution. Most users should use `compare_and_swap()` instead.
345 ///
346 /// # Arguments
347 ///
348 /// * `key` - The key to check and potentially update
349 /// * `expected` - The expected current value
350 /// * `new_value` - The new value to set if comparison succeeds
351 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
352 ///
353 /// # Errors
354 ///
355 /// * `OlderTimestamp` - Timestamp is not newer than existing record
356 pub fn compare_and_swap_with_timestamp(
357 &self,
358 key: &[u8],
359 expected: &[u8],
360 new_value: &[u8],
361 timestamp: Option<u64>,
362 ) -> Result<bool> {
363 self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, timestamp, 0)
364 }
365
366 /// Compare and swap with TTL support.
367 ///
368 /// # Arguments
369 ///
370 /// * `key` - The key to check and potentially update
371 /// * `expected` - The expected current value
372 /// * `new_value` - The new value to set if comparison succeeds
373 /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
374 ///
375 /// # Errors
376 ///
377 /// * `InvalidKeySize` - Key is invalid
378 /// * `InvalidValueSize` - New value is too large
379 pub fn compare_and_swap_with_ttl(
380 &self,
381 key: &[u8],
382 expected: &[u8],
383 new_value: &[u8],
384 ttl_seconds: u64,
385 ) -> Result<bool> {
386 self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, ttl_seconds)
387 }
388
389 /// Compare and swap with explicit timestamp and TTL.
390 ///
391 /// # Arguments
392 ///
393 /// * `key` - The key to check and potentially update
394 /// * `expected` - The expected current value
395 /// * `new_value` - The new value to set if comparison succeeds
396 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
397 /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry)
398 ///
399 /// # Errors
400 ///
401 /// * `OlderTimestamp` - Timestamp is not newer than existing record
402 pub fn compare_and_swap_with_timestamp_and_ttl(
403 &self,
404 key: &[u8],
405 expected: &[u8],
406 new_value: &[u8],
407 timestamp: Option<u64>,
408 ttl_seconds: u64,
409 ) -> Result<bool> {
410 let start = std::time::Instant::now();
411 self.validate_key_value(key, new_value)?;
412 let key_vec = key.to_vec();
413
414 // Phase 1: Check value and save record reference for version tracking
415 let initial_record = {
416 let entry = match self.hash_table.read(&key_vec, |_, v| v.clone()) {
417 Some(e) => e,
418 None => return Ok(false), // Key doesn't exist
419 };
420
421 let record_arc = entry;
422
423 // Check if value matches expected
424 let value_matches = if let Some(val) = record_arc.get_value() {
425 // Fast path: value in memory
426 val.as_ref() == expected
427 } else {
428 // Need disk I/O
429
430 let disk_value = self.load_value_from_disk(&record_arc)?;
431 disk_value == expected
432 };
433
434 if !value_matches {
435 return Ok(false); // Value doesn't match expected
436 }
437
438 // Return the Arc pointer itself as our version identifier
439 // NOTE: We can't use the stored timestamp for verification here because
440 // SystemTime::now() resolution is 1us, which is too coarse for CAS operations.
441 record_arc
442 };
443
444 // Phase 2: Acquire write lock and verify record hasn't changed
445 match self.hash_table.entry(key_vec.clone()) {
446 scc::hash_map::Entry::Occupied(mut entry) => {
447 let old_record = entry.get();
448
449 // Check if the record is still the same one we read earlier
450 if !Arc::ptr_eq(old_record, &initial_record) {
451 // Record was modified between our check and acquiring lock
452 return Ok(false);
453 }
454
455 let timestamp = match timestamp {
456 Some(0) | None => self.get_timestamp(),
457 Some(ts) => ts,
458 };
459
460 if timestamp < old_record.timestamp {
461 return Err(FeoxError::OlderTimestamp);
462 }
463
464 let old_size = old_record.calculate_size();
465 let new_size = self.calculate_record_size(key.len(), new_value.len());
466 let old_value_len = old_record.value_len;
467 let old_record_arc = Arc::clone(old_record);
468
469 // Pre-check memory limit
470 if new_size > old_size && !self.check_memory_limit(new_size - old_size) {
471 return Err(FeoxError::OutOfMemory);
472 }
473
474 // Create new record with TTL if specified
475 let new_record = if ttl_seconds > 0 {
476 let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds
477 Arc::new(Record::new_with_timestamp_ttl(
478 key.to_vec(),
479 new_value.to_vec(),
480 timestamp,
481 ttl_expiry,
482 ))
483 } else {
484 Arc::new(Record::new(key.to_vec(), new_value.to_vec(), timestamp))
485 };
486
487 entry.insert(Arc::clone(&new_record));
488
489 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
490
491 if new_size > old_size {
492 self.stats
493 .memory_usage
494 .fetch_add(new_size - old_size, Ordering::AcqRel);
495 } else {
496 self.stats
497 .memory_usage
498 .fetch_sub(old_size - new_size, Ordering::AcqRel);
499 }
500
501 self.stats
502 .record_insert(start.elapsed().as_nanos() as u64, true);
503
504 if !self.memory_only {
505 if self.enable_caching {
506 if let Some(ref cache) = self.cache {
507 cache.remove(&key_vec);
508 }
509 }
510
511 if let Some(ref wb) = self.write_buffer {
512 let _ = wb.add_write(Operation::Update, new_record, old_value_len);
513 let _ = wb.add_write(Operation::Delete, old_record_arc, old_value_len);
514 }
515 }
516
517 Ok(true)
518 }
519 scc::hash_map::Entry::Vacant(_) => Ok(false),
520 }
521 }
522}