/home/gh-runner/action-runner3/_work/feoxdb/feoxdb/src/core/store/atomic.rs
Line | Count | Source |
1 | | use std::sync::atomic::Ordering; |
2 | | use std::sync::Arc; |
3 | | |
4 | | use crate::constants::Operation; |
5 | | use crate::core::record::Record; |
6 | | use crate::error::{FeoxError, Result}; |
7 | | |
8 | | use super::FeoxStore; |
9 | | |
10 | | impl FeoxStore { |
11 | | /// Atomically increment a numeric counter. |
12 | | /// |
13 | | /// The value must be stored as an 8-byte little-endian i64. If the key doesn't exist, |
14 | | /// it will be created with the given delta value. If it exists, the value will be |
15 | | /// incremented atomically. |
16 | | /// |
17 | | /// # Value Format |
18 | | /// |
19 | | /// The value MUST be exactly 8 bytes representing a little-endian i64. |
20 | | /// Use `i64::to_le_bytes()` to create the initial value: |
21 | | /// ```rust,ignore |
22 | | /// let zero: i64 = 0; |
23 | | /// store.insert(b"counter", &zero.to_le_bytes())?; |
24 | | /// ``` |
25 | | /// |
26 | | /// # Arguments |
27 | | /// |
28 | | /// * `key` - The key of the counter |
29 | | /// * `delta` - The amount to increment by (can be negative for decrement) |
30 | | /// * `timestamp` - Optional timestamp for conflict resolution |
31 | | /// |
32 | | /// # Returns |
33 | | /// |
34 | | /// Returns the new value after incrementing. |
35 | | /// |
36 | | /// # Errors |
37 | | /// |
38 | | /// * `InvalidOperation` - Existing value is not exactly 8 bytes (not a valid i64) |
39 | | /// * `OlderTimestamp` - Timestamp is not newer than existing record |
40 | | /// |
41 | | /// # Example |
42 | | /// |
43 | | /// ```rust |
44 | | /// # use feoxdb::FeoxStore; |
45 | | /// # fn main() -> feoxdb::Result<()> { |
46 | | /// # let store = FeoxStore::new(None)?; |
47 | | /// // Initialize counter with proper binary format |
48 | | /// let initial: i64 = 0; |
49 | | /// store.insert(b"visits", &initial.to_le_bytes())?; |
50 | | /// |
51 | | /// // Increment atomically |
52 | | /// let val = store.atomic_increment(b"visits", 1)?; |
53 | | /// assert_eq!(val, 1); |
54 | | /// |
55 | | /// // Increment by 5 |
56 | | /// let val = store.atomic_increment(b"visits", 5)?; |
57 | | /// assert_eq!(val, 6); |
58 | | /// |
59 | | /// // Decrement by 2 |
60 | | /// let val = store.atomic_increment(b"visits", -2)?; |
61 | | /// assert_eq!(val, 4); |
62 | | /// |
63 | | /// // Or create new counter directly (starts at delta value) |
64 | | /// let downloads = store.atomic_increment(b"downloads", 100)?; |
65 | | /// assert_eq!(downloads, 100); |
66 | | /// # Ok(()) |
67 | | /// # } |
68 | | /// ``` |
69 | 1.10k | pub fn atomic_increment(&self, key: &[u8], delta: i64) -> Result<i64> { |
70 | 1.10k | self.atomic_increment_with_timestamp_and_ttl(key, delta, None, 0) |
71 | 1.10k | } |
72 | | |
73 | | /// Atomically increment/decrement with explicit timestamp. |
74 | | /// |
75 | | /// This is the advanced version that allows manual timestamp control. |
76 | | /// Most users should use `atomic_increment()` instead. |
77 | | /// |
78 | | /// # Arguments |
79 | | /// |
80 | | /// * `key` - The key to increment/decrement |
81 | | /// * `delta` - Amount to add (negative to decrement) |
82 | | /// * `timestamp` - Optional timestamp. If `None`, uses current time. |
83 | | /// |
84 | | /// # Errors |
85 | | /// |
86 | | /// * `OlderTimestamp` - Timestamp is not newer than existing record |
87 | 0 | pub fn atomic_increment_with_timestamp( |
88 | 0 | &self, |
89 | 0 | key: &[u8], |
90 | 0 | delta: i64, |
91 | 0 | timestamp: Option<u64>, |
92 | 0 | ) -> Result<i64> { |
93 | 0 | self.atomic_increment_with_timestamp_and_ttl(key, delta, timestamp, 0) |
94 | 0 | } |
95 | | |
96 | | /// Atomically increment/decrement with TTL support. |
97 | | /// |
98 | | /// # Arguments |
99 | | /// |
100 | | /// * `key` - The key to increment/decrement |
101 | | /// * `delta` - Amount to add (negative to decrement) |
102 | | /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry) |
103 | | /// |
104 | | /// # Errors |
105 | | /// |
106 | | /// * `InvalidOperation` - Value is not a valid i64 |
107 | 0 | pub fn atomic_increment_with_ttl( |
108 | 0 | &self, |
109 | 0 | key: &[u8], |
110 | 0 | delta: i64, |
111 | 0 | ttl_seconds: u64, |
112 | 0 | ) -> Result<i64> { |
113 | 0 | self.atomic_increment_with_timestamp_and_ttl(key, delta, None, ttl_seconds) |
114 | 0 | } |
115 | | |
116 | | /// Atomically increment/decrement with explicit timestamp and TTL. |
117 | | /// |
118 | | /// # Arguments |
119 | | /// |
120 | | /// * `key` - The key to increment/decrement |
121 | | /// * `delta` - Amount to add (negative to decrement) |
122 | | /// * `timestamp` - Optional timestamp. If `None`, uses current time. |
123 | | /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry) |
124 | | /// |
125 | | /// # Errors |
126 | | /// |
127 | | /// * `OlderTimestamp` - Timestamp is not newer than existing record |
128 | 1.10k | pub fn atomic_increment_with_timestamp_and_ttl( |
129 | 1.10k | &self, |
130 | 1.10k | key: &[u8], |
131 | 1.10k | delta: i64, |
132 | 1.10k | timestamp: Option<u64>, |
133 | 1.10k | ttl_seconds: u64, |
134 | 1.10k | ) -> Result<i64> { |
135 | 1.10k | self.validate_key(key)?0 ; |
136 | | |
137 | 1.10k | let key_vec = key.to_vec(); |
138 | | |
139 | 1.10k | let result1.10k = match self.hash_table.entry(key_vec.clone()) { |
140 | 1.10k | scc::hash_map::Entry::Occupied(mut entry) => { |
141 | 1.10k | let old_record = entry.get(); |
142 | | |
143 | | // Get timestamp inside the critical section to ensure it's always newer |
144 | 1.10k | let timestamp = match timestamp { |
145 | 1.10k | Some(0) | None => self.get_timestamp(), |
146 | 0 | Some(ts) => ts, |
147 | | }; |
148 | | |
149 | | // Check if timestamp is valid |
150 | 1.10k | if timestamp < old_record.timestamp { |
151 | 0 | return Err(FeoxError::OlderTimestamp); |
152 | 1.10k | } |
153 | | |
154 | | // Load value from memory or disk |
155 | 1.10k | let value = if let Some(val1.10k ) = old_record.get_value() { |
156 | 1.10k | val.to_vec() |
157 | | } else { |
158 | | // Try loading from disk if not in memory |
159 | 1 | self.load_value_from_disk(old_record)?0 |
160 | | }; |
161 | | |
162 | 1.10k | let current_val1.10k = if value.len() == 8 { |
163 | 1.10k | let bytes = value |
164 | 1.10k | .get(..8) |
165 | 1.10k | .and_then(|slice| slice.try_into().ok()) |
166 | 1.10k | .ok_or(FeoxError::InvalidNumericValue)?0 ; |
167 | 1.10k | i64::from_le_bytes(bytes) |
168 | | } else { |
169 | 1 | return Err(FeoxError::InvalidOperation); |
170 | | }; |
171 | | |
172 | 1.10k | let new_val = current_val.saturating_add(delta); |
173 | 1.10k | let new_value = new_val.to_le_bytes().to_vec(); |
174 | | |
175 | | // Create new record with TTL if specified |
176 | 1.10k | let new_record = if ttl_seconds > 0 { |
177 | 0 | let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds |
178 | 0 | Arc::new(Record::new_with_timestamp_ttl( |
179 | 0 | old_record.key.clone(), |
180 | 0 | new_value, |
181 | 0 | timestamp, |
182 | 0 | ttl_expiry, |
183 | | )) |
184 | | } else { |
185 | 1.10k | Arc::new(Record::new(old_record.key.clone(), new_value, timestamp)) |
186 | | }; |
187 | | |
188 | 1.10k | let old_value_len = old_record.value_len; |
189 | 1.10k | let old_size = old_record.calculate_size(); |
190 | 1.10k | let new_size = self.calculate_record_size(old_record.key.len(), 8); |
191 | 1.10k | let old_record_arc = Arc::clone(old_record); |
192 | | |
193 | | // Atomically update the entry |
194 | 1.10k | entry.insert(Arc::clone(&new_record)); |
195 | | |
196 | | // Update skip list as well |
197 | 1.10k | self.tree.insert(key_vec.clone(), Arc::clone(&new_record)); |
198 | | |
199 | | // Update memory usage |
200 | 1.10k | if new_size > old_size { |
201 | 0 | self.stats |
202 | 0 | .memory_usage |
203 | 0 | .fetch_add(new_size - old_size, Ordering::AcqRel); |
204 | 1.10k | } else { |
205 | 1.10k | self.stats |
206 | 1.10k | .memory_usage |
207 | 1.10k | .fetch_sub(old_size - new_size, Ordering::AcqRel); |
208 | 1.10k | } |
209 | | |
210 | | // Only do cache and persistence operations if not in memory-only mode |
211 | 1.10k | if !self.memory_only { |
212 | 101 | if self.enable_caching { |
213 | 101 | if let Some(ref cache) = self.cache { |
214 | 101 | cache.remove(&key_vec); |
215 | 101 | }0 |
216 | 0 | } |
217 | | |
218 | 101 | if let Some(ref wb) = self.write_buffer { |
219 | 0 | if let Err(e) = |
220 | 101 | wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len) |
221 | 0 | { |
222 | 0 | // Atomic operation succeeded in memory |
223 | 0 | let _ = e; |
224 | 101 | } |
225 | | |
226 | 0 | if let Err(e) = |
227 | 101 | wb.add_write(Operation::Delete, old_record_arc, old_value_len) |
228 | 0 | { |
229 | 0 | // Atomic operation succeeded in memory |
230 | 0 | let _ = e; |
231 | 101 | } |
232 | 0 | } |
233 | 1.00k | } |
234 | | |
235 | 1.10k | Ok(new_val) |
236 | | } |
237 | 1 | scc::hash_map::Entry::Vacant(entry) => { |
238 | | // Key doesn't exist, create it with initial value |
239 | | // Get timestamp inside the critical section |
240 | 1 | let timestamp = match timestamp { |
241 | 1 | Some(0) | None => self.get_timestamp(), |
242 | 0 | Some(ts) => ts, |
243 | | }; |
244 | | |
245 | 1 | let initial_val = delta; |
246 | 1 | let value = initial_val.to_le_bytes().to_vec(); |
247 | | |
248 | | // Create new record with TTL if specified |
249 | 1 | let new_record = if ttl_seconds > 0 { |
250 | 0 | let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds |
251 | 0 | Arc::new(Record::new_with_timestamp_ttl( |
252 | 0 | key_vec.clone(), |
253 | 0 | value, |
254 | 0 | timestamp, |
255 | 0 | ttl_expiry, |
256 | | )) |
257 | | } else { |
258 | 1 | Arc::new(Record::new(key_vec.clone(), value, timestamp)) |
259 | | }; |
260 | | |
261 | 1 | let _ = entry.insert_entry(Arc::clone(&new_record)); |
262 | | |
263 | | // Update skip list |
264 | 1 | self.tree.insert(key_vec.clone(), Arc::clone(&new_record)); |
265 | | |
266 | | // Update statistics |
267 | 1 | self.stats.record_count.fetch_add(1, Ordering::AcqRel); |
268 | 1 | let record_size = self.calculate_record_size(key.len(), 8); |
269 | 1 | self.stats |
270 | 1 | .memory_usage |
271 | 1 | .fetch_add(record_size, Ordering::AcqRel); |
272 | | |
273 | | // Handle persistence if needed |
274 | 1 | if !self.memory_only { |
275 | 0 | if let Some(ref wb) = self.write_buffer { |
276 | 0 | if let Err(e) = wb.add_write(Operation::Insert, Arc::clone(&new_record), 0) |
277 | 0 | { |
278 | 0 | // Operation succeeded in memory |
279 | 0 | let _ = e; |
280 | 0 | } |
281 | 0 | } |
282 | 1 | } |
283 | | |
284 | 1 | Ok(initial_val) |
285 | | } |
286 | | }; |
287 | | |
288 | 1.10k | result |
289 | 1.10k | } |
290 | | |
291 | | /// Atomically compare and swap a value. |
292 | | /// |
293 | | /// Compares the current value of a key with an expected value, and if they match, |
294 | | /// atomically replaces it with a new value. This operation is atomic within the |
295 | | /// HashMap shard, preventing race conditions. |
296 | | /// |
297 | | /// # Arguments |
298 | | /// |
299 | | /// * `key` - The key to check and potentially update |
300 | | /// * `expected` - The expected current value |
301 | | /// * `new_value` - The new value to set if comparison succeeds |
302 | | /// |
303 | | /// # Returns |
304 | | /// |
305 | | /// Returns `Ok(true)` if the swap succeeded (current value matched expected). |
306 | | /// Returns `Ok(false)` if the current value didn't match or key doesn't exist. |
307 | | /// |
308 | | /// # Errors |
309 | | /// |
310 | | /// * `InvalidKeySize` - Key is invalid |
311 | | /// * `InvalidValueSize` - New value is too large |
312 | | /// * `OutOfMemory` - Memory limit exceeded |
313 | | /// * `IoError` - Failed to read value from disk |
314 | | /// |
315 | | /// # Example |
316 | | /// |
317 | | /// ```rust |
318 | | /// # use feoxdb::FeoxStore; |
319 | | /// # fn main() -> feoxdb::Result<()> { |
320 | | /// # let store = FeoxStore::new(None)?; |
321 | | /// store.insert(b"config", b"v1")?; |
322 | | /// |
323 | | /// // Successful CAS - value matches |
324 | | /// let swapped = store.compare_and_swap(b"config", b"v1", b"v2")?; |
325 | | /// assert_eq!(swapped, true); |
326 | | /// |
327 | | /// // Failed CAS - value doesn't match |
328 | | /// let swapped = store.compare_and_swap(b"config", b"v1", b"v3")?; |
329 | | /// assert_eq!(swapped, false); // Value is now "v2", not "v1" |
330 | | /// |
331 | | /// // CAS on non-existent key |
332 | | /// let swapped = store.compare_and_swap(b"missing", b"any", b"new")?; |
333 | | /// assert_eq!(swapped, false); |
334 | | /// # Ok(()) |
335 | | /// # } |
336 | | /// ``` |
337 | 3.28k | pub fn compare_and_swap(&self, key: &[u8], expected: &[u8], new_value: &[u8]) -> Result<bool> { |
338 | 3.28k | self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, 0) |
339 | 3.28k | } |
340 | | |
341 | | /// Compare and swap with explicit timestamp. |
342 | | /// |
343 | | /// This is the advanced version that allows manual timestamp control for |
344 | | /// conflict resolution. Most users should use `compare_and_swap()` instead. |
345 | | /// |
346 | | /// # Arguments |
347 | | /// |
348 | | /// * `key` - The key to check and potentially update |
349 | | /// * `expected` - The expected current value |
350 | | /// * `new_value` - The new value to set if comparison succeeds |
351 | | /// * `timestamp` - Optional timestamp. If `None`, uses current time. |
352 | | /// |
353 | | /// # Errors |
354 | | /// |
355 | | /// * `OlderTimestamp` - Timestamp is not newer than existing record |
356 | 1 | pub fn compare_and_swap_with_timestamp( |
357 | 1 | &self, |
358 | 1 | key: &[u8], |
359 | 1 | expected: &[u8], |
360 | 1 | new_value: &[u8], |
361 | 1 | timestamp: Option<u64>, |
362 | 1 | ) -> Result<bool> { |
363 | 1 | self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, timestamp, 0) |
364 | 1 | } |
365 | | |
366 | | /// Compare and swap with TTL support. |
367 | | /// |
368 | | /// # Arguments |
369 | | /// |
370 | | /// * `key` - The key to check and potentially update |
371 | | /// * `expected` - The expected current value |
372 | | /// * `new_value` - The new value to set if comparison succeeds |
373 | | /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry) |
374 | | /// |
375 | | /// # Errors |
376 | | /// |
377 | | /// * `InvalidKeySize` - Key is invalid |
378 | | /// * `InvalidValueSize` - New value is too large |
379 | 0 | pub fn compare_and_swap_with_ttl( |
380 | 0 | &self, |
381 | 0 | key: &[u8], |
382 | 0 | expected: &[u8], |
383 | 0 | new_value: &[u8], |
384 | 0 | ttl_seconds: u64, |
385 | 0 | ) -> Result<bool> { |
386 | 0 | self.compare_and_swap_with_timestamp_and_ttl(key, expected, new_value, None, ttl_seconds) |
387 | 0 | } |
388 | | |
389 | | /// Compare and swap with explicit timestamp and TTL. |
390 | | /// |
391 | | /// # Arguments |
392 | | /// |
393 | | /// * `key` - The key to check and potentially update |
394 | | /// * `expected` - The expected current value |
395 | | /// * `new_value` - The new value to set if comparison succeeds |
396 | | /// * `timestamp` - Optional timestamp. If `None`, uses current time. |
397 | | /// * `ttl_seconds` - Time-to-live in seconds (0 for no expiry) |
398 | | /// |
399 | | /// # Errors |
400 | | /// |
401 | | /// * `OlderTimestamp` - Timestamp is not newer than existing record |
402 | 3.28k | pub fn compare_and_swap_with_timestamp_and_ttl( |
403 | 3.28k | &self, |
404 | 3.28k | key: &[u8], |
405 | 3.28k | expected: &[u8], |
406 | 3.28k | new_value: &[u8], |
407 | 3.28k | timestamp: Option<u64>, |
408 | 3.28k | ttl_seconds: u64, |
409 | 3.28k | ) -> Result<bool> { |
410 | 3.28k | let start = std::time::Instant::now(); |
411 | 3.28k | self.validate_key_value(key, new_value)?0 ; |
412 | 3.28k | let key_vec = key.to_vec(); |
413 | | |
414 | | // Phase 1: Check value and save record reference for version tracking |
415 | 2.21k | let initial_record = { |
416 | 3.28k | let entry3.28k = match self.hash_table.read(&key_vec, |_, v| v3.28k .clone3.28k ()) { |
417 | 3.28k | Some(e) => e, |
418 | 1 | None => return Ok(false), // Key doesn't exist |
419 | | }; |
420 | | |
421 | 3.28k | let record_arc = entry; |
422 | | |
423 | | // Check if value matches expected |
424 | 3.28k | let value_matches = if let Some(val) = record_arc.get_value() { |
425 | | // Fast path: value in memory |
426 | 3.28k | val.as_ref() == expected |
427 | | } else { |
428 | | // Need disk I/O |
429 | | |
430 | 0 | let disk_value = self.load_value_from_disk(&record_arc)?; |
431 | 0 | disk_value == expected |
432 | | }; |
433 | | |
434 | 3.28k | if !value_matches { |
435 | 1.06k | return Ok(false); // Value doesn't match expected |
436 | 2.21k | } |
437 | | |
438 | | // Return the Arc pointer itself as our version identifier |
439 | | // NOTE: We can't use the stored timestamp for verification here because |
440 | | // SystemTime::now() resolution is 1us, which is too coarse for CAS operations. |
441 | 2.21k | record_arc |
442 | | }; |
443 | | |
444 | | // Phase 2: Acquire write lock and verify record hasn't changed |
445 | 2.21k | match self.hash_table.entry(key_vec.clone()) { |
446 | 2.21k | scc::hash_map::Entry::Occupied(mut entry) => { |
447 | 2.21k | let old_record = entry.get(); |
448 | | |
449 | | // Check if the record is still the same one we read earlier |
450 | 2.21k | if !Arc::ptr_eq(old_record, &initial_record) { |
451 | | // Record was modified between our check and acquiring lock |
452 | 1.10k | return Ok(false); |
453 | 1.10k | } |
454 | | |
455 | 1.10k | let timestamp = match timestamp { |
456 | 1.10k | Some(0) | None => self.get_timestamp(), |
457 | 1 | Some(ts) => ts, |
458 | | }; |
459 | | |
460 | 1.10k | if timestamp < old_record.timestamp { |
461 | 1 | return Err(FeoxError::OlderTimestamp); |
462 | 1.10k | } |
463 | | |
464 | 1.10k | let old_size = old_record.calculate_size(); |
465 | 1.10k | let new_size = self.calculate_record_size(key.len(), new_value.len()); |
466 | 1.10k | let old_value_len = old_record.value_len; |
467 | 1.10k | let old_record_arc = Arc::clone(old_record); |
468 | | |
469 | | // Pre-check memory limit |
470 | 1.10k | if new_size > old_size && !5 self5 .check_memory_limit5 (new_size - old_size) { |
471 | 0 | return Err(FeoxError::OutOfMemory); |
472 | 1.10k | } |
473 | | |
474 | | // Create new record with TTL if specified |
475 | 1.10k | let new_record = if ttl_seconds > 0 { |
476 | 0 | let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000); // Convert to nanoseconds |
477 | 0 | Arc::new(Record::new_with_timestamp_ttl( |
478 | 0 | key.to_vec(), |
479 | 0 | new_value.to_vec(), |
480 | 0 | timestamp, |
481 | 0 | ttl_expiry, |
482 | | )) |
483 | | } else { |
484 | 1.10k | Arc::new(Record::new(key.to_vec(), new_value.to_vec(), timestamp)) |
485 | | }; |
486 | | |
487 | 1.10k | entry.insert(Arc::clone(&new_record)); |
488 | | |
489 | 1.10k | self.tree.insert(key_vec.clone(), Arc::clone(&new_record)); |
490 | | |
491 | 1.10k | if new_size > old_size { |
492 | 5 | self.stats |
493 | 5 | .memory_usage |
494 | 5 | .fetch_add(new_size - old_size, Ordering::AcqRel); |
495 | 1.10k | } else { |
496 | 1.10k | self.stats |
497 | 1.10k | .memory_usage |
498 | 1.10k | .fetch_sub(old_size - new_size, Ordering::AcqRel); |
499 | 1.10k | } |
500 | | |
501 | 1.10k | self.stats |
502 | 1.10k | .record_insert(start.elapsed().as_nanos() as u64, true); |
503 | | |
504 | 1.10k | if !self.memory_only { |
505 | 0 | if self.enable_caching { |
506 | 0 | if let Some(ref cache) = self.cache { |
507 | 0 | cache.remove(&key_vec); |
508 | 0 | } |
509 | 0 | } |
510 | | |
511 | 0 | if let Some(ref wb) = self.write_buffer { |
512 | 0 | let _ = wb.add_write(Operation::Update, new_record, old_value_len); |
513 | 0 | let _ = wb.add_write(Operation::Delete, old_record_arc, old_value_len); |
514 | 0 | } |
515 | 1.10k | } |
516 | | |
517 | 1.10k | Ok(true) |
518 | | } |
519 | 0 | scc::hash_map::Entry::Vacant(_) => Ok(false), |
520 | | } |
521 | 3.28k | } |
522 | | } |