feoxdb/core/store/operations.rs
1use bytes::Bytes;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::constants::*;
7use crate::core::record::Record;
8use crate::error::{FeoxError, Result};
9
10use super::FeoxStore;
11
12impl FeoxStore {
13 /// Insert or update a key-value pair.
14 ///
15 /// If the key already exists with a TTL, the TTL is removed (key becomes permanent).
16 /// To preserve or set TTL, use `insert_with_ttl()` instead.
17 ///
18 /// # Arguments
19 ///
20 /// * `key` - The key to insert
21 /// * `value` - The value to store
22 /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
23 ///
24 /// # Returns
25 ///
26 /// Returns `Ok(true)` if a new key was inserted, `Ok(false)` if an existing key was updated.
27 ///
28 /// # Errors
29 ///
30 /// * `InvalidKey` - Key is empty or too large
31 /// * `InvalidValue` - Value is too large
32 /// * `OlderTimestamp` - Timestamp is not newer than existing record
33 /// * `OutOfMemory` - Memory limit exceeded
34 ///
35 /// # Example
36 ///
37 /// ```rust
38 /// # use feoxdb::FeoxStore;
39 /// # fn main() -> feoxdb::Result<()> {
40 /// # let store = FeoxStore::new(None)?;
41 /// store.insert(b"user:123", b"{\"name\":\"Mehran\"}")?;
42 /// # Ok(())
43 /// # }
44 /// ```
45 ///
46 /// # Performance
47 ///
48 /// * Memory mode: ~600ns
49 /// * Persistent mode: ~800ns (buffered write)
50 pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<bool> {
51 self.insert_with_timestamp(key, value, None)
52 }
53
54 /// Insert or update a key-value pair with explicit timestamp.
55 ///
56 /// This is the advanced version that allows manual timestamp control for
57 /// conflict resolution. Most users should use `insert()` instead.
58 ///
59 /// # Arguments
60 ///
61 /// * `key` - The key to insert
62 /// * `value` - The value to store
63 /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
64 ///
65 /// # Errors
66 ///
67 /// * `OlderTimestamp` - Timestamp is not newer than existing record
68 pub fn insert_with_timestamp(
69 &self,
70 key: &[u8],
71 value: &[u8],
72 timestamp: Option<u64>,
73 ) -> Result<bool> {
74 self.insert_with_timestamp_and_ttl_internal(key, value, timestamp, 0)
75 }
76
77 /// Insert or update a key-value pair using zero-copy Bytes.
78 ///
79 /// This method avoids copying the value data by directly using the Bytes type,
80 /// which provides reference-counted zero-copy semantics. Useful when inserting
81 /// data that was already read from network or disk as Bytes.
82 ///
83 /// If the key already exists with a TTL, the TTL is removed (key becomes permanent).
84 /// To preserve or set TTL, use `insert_bytes_with_ttl()` instead.
85 ///
86 /// # Arguments
87 ///
88 /// * `key` - The key to insert
89 /// * `value` - The value to store as Bytes
90 ///
91 /// # Returns
92 ///
93 /// Returns `Ok(true)` if a new key was inserted, `Ok(false)` if an existing key was updated.
94 ///
95 /// # Errors
96 ///
97 /// * `InvalidKey` - Key is empty or too large
98 /// * `InvalidValue` - Value is too large
99 /// * `OlderTimestamp` - Timestamp is not newer than existing record
100 /// * `OutOfMemory` - Memory limit exceeded
101 ///
102 /// # Example
103 ///
104 /// ```rust
105 /// # use feoxdb::FeoxStore;
106 /// # use bytes::Bytes;
107 /// # fn main() -> feoxdb::Result<()> {
108 /// # let store = FeoxStore::new(None)?;
109 /// let data = Bytes::from_static(b"{\"name\":\"Mehran\"}");
110 /// store.insert_bytes(b"user:123", data)?;
111 /// # Ok(())
112 /// # }
113 /// ```
114 ///
115 /// # Performance
116 ///
117 /// * Memory mode: ~600ns (avoids value copy)
118 /// * Persistent mode: ~800ns (buffered write, avoids value copy)
119 pub fn insert_bytes(&self, key: &[u8], value: Bytes) -> Result<bool> {
120 self.insert_bytes_with_timestamp(key, value, None)
121 }
122
123 /// Insert or update a key-value pair using zero-copy Bytes with explicit timestamp.
124 ///
125 /// This is the advanced version that allows manual timestamp control for
126 /// conflict resolution. Most users should use `insert_bytes()` instead.
127 ///
128 /// # Arguments
129 ///
130 /// * `key` - The key to insert
131 /// * `value` - The value to store as Bytes
132 /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
133 ///
134 /// # Errors
135 ///
136 /// * `OlderTimestamp` - Timestamp is not newer than existing record
137 pub fn insert_bytes_with_timestamp(
138 &self,
139 key: &[u8],
140 value: Bytes,
141 timestamp: Option<u64>,
142 ) -> Result<bool> {
143 self.insert_bytes_with_timestamp_and_ttl_internal(key, value, timestamp, 0)
144 }
145
146 pub(super) fn insert_with_timestamp_and_ttl_internal(
147 &self,
148 key: &[u8],
149 value: &[u8],
150 timestamp: Option<u64>,
151 ttl_expiry: u64,
152 ) -> Result<bool> {
153 let start = std::time::Instant::now();
154 let timestamp = match timestamp {
155 Some(0) | None => self.get_timestamp(),
156 Some(ts) => ts,
157 };
158 self.validate_key_value(key, value)?;
159
160 // Check for existing record
161 let is_update = self.hash_table.contains(key);
162 let existing_record = self.hash_table.read(key, |_, v| v.clone());
163 if let Some(existing_record) = existing_record {
164 let existing_ts = existing_record.timestamp;
165 let existing_clone = existing_record;
166
167 if timestamp < existing_ts {
168 return Err(FeoxError::OlderTimestamp);
169 }
170
171 // Update existing record
172 return self.update_record_with_ttl(&existing_clone, value, timestamp, ttl_expiry);
173 }
174
175 let record_size = self.calculate_record_size(key.len(), value.len());
176 if !self.check_memory_limit(record_size) {
177 return Err(FeoxError::OutOfMemory);
178 }
179
180 // Create new record with TTL if specified and TTL is enabled
181 let record = if ttl_expiry > 0 && self.enable_ttl {
182 self.stats.keys_with_ttl.fetch_add(1, Ordering::Relaxed);
183 Arc::new(Record::new_with_timestamp_ttl(
184 key.to_vec(),
185 value.to_vec(),
186 timestamp,
187 ttl_expiry,
188 ))
189 } else {
190 Arc::new(Record::new(key.to_vec(), value.to_vec(), timestamp))
191 };
192
193 let key_vec = record.key.clone();
194
195 // Insert into hash table
196 self.hash_table.upsert(key_vec.clone(), Arc::clone(&record));
197
198 // Insert into lock-free skip list for ordered access
199 self.tree.insert(key_vec, Arc::clone(&record));
200
201 // Update statistics
202 self.stats.record_count.fetch_add(1, Ordering::AcqRel);
203 self.stats
204 .memory_usage
205 .fetch_add(record_size, Ordering::AcqRel);
206 self.stats
207 .record_insert(start.elapsed().as_nanos() as u64, is_update);
208
209 // Only do persistence if not in memory-only mode
210 if !self.memory_only {
211 // Queue for persistence if write buffer exists
212 if let Some(ref wb) = self.write_buffer {
213 if let Err(_e) = wb.add_write(Operation::Insert, record, 0) {
214 // Don't fail the insert - data is still in memory
215 // Return code already indicates success since data is in memory
216 }
217 }
218 }
219
220 Ok(!is_update)
221 }
222
223 /// Internal method to insert a Bytes value with timestamp and TTL (zero-copy)
224 pub(super) fn insert_bytes_with_timestamp_and_ttl_internal(
225 &self,
226 key: &[u8],
227 value: Bytes,
228 timestamp: Option<u64>,
229 ttl_seconds: u64,
230 ) -> Result<bool> {
231 let start = std::time::Instant::now();
232 // Get timestamp before any operations
233 let timestamp = match timestamp {
234 Some(0) | None => self.get_timestamp(),
235 Some(ts) => ts,
236 };
237
238 self.validate_key(key)?;
239 let value_len = value.len();
240 if value_len == 0 || value_len > MAX_VALUE_SIZE {
241 return Err(FeoxError::InvalidValueSize);
242 }
243
244 // Check for existing record
245 let is_update = self.hash_table.contains(key);
246 let existing_record = self.hash_table.read(key, |_, v| v.clone());
247 if let Some(existing_record) = existing_record {
248 let existing_ts = existing_record.timestamp;
249
250 if timestamp < existing_ts {
251 return Err(FeoxError::OlderTimestamp);
252 }
253
254 // Calculate TTL expiry
255 let ttl_expiry = if ttl_seconds > 0 && self.enable_ttl {
256 timestamp + (ttl_seconds * 1_000_000_000)
257 } else {
258 0
259 };
260
261 // Update existing record using the Bytes version
262 return self.update_record_with_ttl_bytes(
263 &existing_record,
264 value,
265 timestamp,
266 ttl_expiry,
267 );
268 }
269
270 // This point is only reached for new inserts (not updates)
271 let new_size = self.calculate_record_size(key.len(), value_len);
272 if !self.check_memory_limit(new_size) {
273 return Err(FeoxError::OutOfMemory);
274 }
275
276 // Create new record with Bytes value
277 let record = if ttl_seconds > 0 && self.enable_ttl {
278 let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000);
279 self.stats.keys_with_ttl.fetch_add(1, Ordering::Relaxed);
280 Arc::new(Record::new_from_bytes_with_ttl(
281 key.to_vec(),
282 value,
283 timestamp,
284 ttl_expiry,
285 ))
286 } else {
287 Arc::new(Record::new_from_bytes(key.to_vec(), value, timestamp))
288 };
289
290 let key_vec = record.key.clone();
291
292 // Insert into hash table
293 self.hash_table.upsert(key_vec.clone(), Arc::clone(&record));
294
295 // Insert into skip list for ordered access
296 self.tree.insert(key_vec, Arc::clone(&record));
297
298 // Update statistics
299 self.stats.record_count.fetch_add(1, Ordering::AcqRel);
300 self.stats
301 .memory_usage
302 .fetch_add(new_size, Ordering::AcqRel);
303 self.stats
304 .record_insert(start.elapsed().as_nanos() as u64, is_update);
305
306 // Only do persistence if not in memory-only mode
307 if !self.memory_only {
308 // Queue for persistence if write buffer exists
309 if let Some(ref wb) = self.write_buffer {
310 if let Err(_e) = wb.add_write(Operation::Insert, record, 0) {
311 // Don't fail the insert - data is still in memory
312 }
313 }
314 }
315
316 Ok(!is_update)
317 }
318
319 /// Retrieve a value by key.
320 ///
321 /// # Arguments
322 ///
323 /// * `key` - The key to look up
324 /// * `expected_size` - Optional expected value size for validation
325 ///
326 /// # Returns
327 ///
328 /// Returns the value as a `Vec<u8>` if found.
329 ///
330 /// # Errors
331 ///
332 /// * `KeyNotFound` - Key does not exist
333 /// * `InvalidKey` - Key is invalid
334 /// * `SizeMismatch` - Value size doesn't match expected size
335 /// * `IoError` - Failed to read from disk (persistent mode)
336 ///
337 /// # Example
338 ///
339 /// ```rust
340 /// # use feoxdb::FeoxStore;
341 /// # fn main() -> feoxdb::Result<()> {
342 /// # let store = FeoxStore::new(None)?;
343 /// # store.insert(b"key", b"value")?;
344 /// let value = store.get(b"key")?;
345 /// assert_eq!(value, b"value");
346 /// # Ok(())
347 /// # }
348 /// ```
349 ///
350 /// # Performance
351 ///
352 /// * Memory mode: ~100ns
353 /// * Persistent mode (cached): ~150ns
354 /// * Persistent mode (disk read): ~500ns
355 pub fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
356 let start = std::time::Instant::now();
357 self.validate_key(key)?;
358
359 let mut cache_hit = false;
360 if self.enable_caching {
361 if let Some(ref cache) = self.cache {
362 if let Some(value) = cache.get(key) {
363 self.stats
364 .record_get(start.elapsed().as_nanos() as u64, true);
365 return Ok(value.to_vec());
366 }
367 }
368 }
369
370 let record = self
371 .hash_table
372 .read(key, |_, v| v.clone())
373 .ok_or(FeoxError::KeyNotFound)?;
374
375 // Check TTL expiry if TTL is enabled
376 if self.enable_ttl {
377 let ttl_expiry = record.ttl_expiry.load(Ordering::Relaxed);
378 if ttl_expiry > 0 {
379 let now = SystemTime::now()
380 .duration_since(UNIX_EPOCH)
381 .unwrap_or_default()
382 .as_nanos() as u64;
383 if now > ttl_expiry {
384 self.stats.ttl_expired_lazy.fetch_add(1, Ordering::Relaxed);
385 return Err(FeoxError::KeyNotFound);
386 }
387 }
388 }
389
390 let value = if let Some(val) = record.get_value() {
391 val.to_vec()
392 } else {
393 cache_hit = false; // Reading from disk
394 self.load_value_from_disk(&record)?
395 };
396
397 if self.enable_caching {
398 if let Some(ref cache) = self.cache {
399 cache.insert(key.to_vec(), Bytes::from(value.clone()));
400 }
401 }
402
403 self.stats
404 .record_get(start.elapsed().as_nanos() as u64, cache_hit);
405 Ok(value)
406 }
407
408 /// Get a value by key without copying (zero-copy).
409 ///
410 /// Returns `Bytes` which avoids the memory copy that `get()` performs
411 /// when converting to `Vec<u8>`.
412 ///
413 /// # Arguments
414 ///
415 /// * `key` - The key to look up
416 ///
417 /// # Returns
418 ///
419 /// Returns the value as `Bytes` if found.
420 ///
421 /// # Example
422 ///
423 /// ```rust
424 /// # use feoxdb::FeoxStore;
425 /// # fn main() -> feoxdb::Result<()> {
426 /// # let store = FeoxStore::new(None)?;
427 /// # store.insert(b"key", b"value")?;
428 /// let bytes = store.get_bytes(b"key")?;
429 /// // Use bytes directly without copying
430 /// assert_eq!(&bytes[..], b"value");
431 /// # Ok(())
432 /// # }
433 /// ```
434 ///
435 /// # Performance
436 ///
437 /// Significantly faster than `get()` for large values:
438 /// * 100 bytes: ~15% faster
439 /// * 1KB: ~50% faster
440 /// * 10KB: ~90% faster
441 /// * 100KB: ~95% faster
442 pub fn get_bytes(&self, key: &[u8]) -> Result<Bytes> {
443 let start = std::time::Instant::now();
444 self.validate_key(key)?;
445
446 if self.enable_caching {
447 if let Some(ref cache) = self.cache {
448 if let Some(value) = cache.get(key) {
449 self.stats
450 .record_get(start.elapsed().as_nanos() as u64, true);
451 return Ok(value);
452 }
453 }
454 }
455
456 let record = self
457 .hash_table
458 .read(key, |_, v| v.clone())
459 .ok_or(FeoxError::KeyNotFound)?;
460
461 // Check TTL expiry if TTL is enabled
462 if self.enable_ttl {
463 let ttl_expiry = record.ttl_expiry.load(Ordering::Relaxed);
464 if ttl_expiry > 0 {
465 let now = SystemTime::now()
466 .duration_since(UNIX_EPOCH)
467 .unwrap_or_default()
468 .as_nanos() as u64;
469 if now > ttl_expiry {
470 self.stats.ttl_expired_lazy.fetch_add(1, Ordering::Relaxed);
471 return Err(FeoxError::KeyNotFound);
472 }
473 }
474 }
475
476 let (value, cache_hit) = if let Some(val) = record.get_value() {
477 (val, true)
478 } else {
479 (Bytes::from(self.load_value_from_disk(&record)?), false)
480 };
481
482 if self.enable_caching {
483 if let Some(ref cache) = self.cache {
484 cache.insert(key.to_vec(), value.clone());
485 }
486 }
487
488 self.stats
489 .record_get(start.elapsed().as_nanos() as u64, cache_hit);
490 Ok(value)
491 }
492
493 /// Delete a key-value pair.
494 ///
495 /// # Arguments
496 ///
497 /// * `key` - The key to delete
498 /// * `timestamp` - Optional timestamp for conflict resolution
499 ///
500 /// # Returns
501 ///
502 /// Returns `Ok(())` if the key was deleted.
503 ///
504 /// # Errors
505 ///
506 /// * `KeyNotFound` - Key does not exist
507 /// * `OlderTimestamp` - Timestamp is not newer than existing record
508 ///
509 /// # Example
510 ///
511 /// ```rust
512 /// # use feoxdb::FeoxStore;
513 /// # fn main() -> feoxdb::Result<()> {
514 /// # let store = FeoxStore::new(None)?;
515 /// # store.insert(b"temp", b"data")?;
516 /// store.delete(b"temp")?;
517 /// # Ok(())
518 /// # }
519 /// ```
520 ///
521 /// # Performance
522 ///
523 /// * Memory mode: ~300ns
524 /// * Persistent mode: ~400ns
525 pub fn delete(&self, key: &[u8]) -> Result<()> {
526 self.delete_with_timestamp(key, None)
527 }
528
529 /// Delete a key-value pair with explicit timestamp.
530 ///
531 /// This is the advanced version that allows manual timestamp control.
532 /// Most users should use `delete()` instead.
533 ///
534 /// # Arguments
535 ///
536 /// * `key` - The key to delete
537 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
538 ///
539 /// # Errors
540 ///
541 /// * `OlderTimestamp` - Timestamp is not newer than existing record
542 pub fn delete_with_timestamp(&self, key: &[u8], timestamp: Option<u64>) -> Result<()> {
543 let start = std::time::Instant::now();
544 let timestamp = match timestamp {
545 Some(0) | None => self.get_timestamp(),
546 Some(ts) => ts,
547 };
548 self.validate_key(key)?;
549
550 // Remove from hash table and get the record
551 let record_pair = self.hash_table.remove(key).ok_or(FeoxError::KeyNotFound)?;
552 let record = record_pair.1;
553
554 if timestamp < record.timestamp {
555 // Put it back if timestamp is older
556 self.hash_table.upsert(key.to_vec(), record);
557 return Err(FeoxError::OlderTimestamp);
558 }
559
560 let record_size = record.calculate_size();
561 let old_value_len = record.value_len;
562
563 // Mark record as deleted by setting refcount to 0
564 record.refcount.store(0, Ordering::Release);
565
566 // Remove from lock-free skip list
567 self.tree.remove(key);
568
569 // Update statistics
570 self.stats.record_count.fetch_sub(1, Ordering::AcqRel);
571 self.stats
572 .memory_usage
573 .fetch_sub(record_size, Ordering::AcqRel);
574
575 // Clear from cache
576 if self.enable_caching {
577 if let Some(ref cache) = self.cache {
578 cache.remove(key);
579 }
580 }
581
582 // Queue deletion for persistence if write buffer exists and not memory-only
583 if !self.memory_only {
584 if let Some(ref wb) = self.write_buffer {
585 if let Err(_e) = wb.add_write(Operation::Delete, record, old_value_len) {
586 // Silent failure - data operation succeeded in memory
587 }
588 }
589 }
590
591 self.stats.record_delete(start.elapsed().as_nanos() as u64);
592 Ok(())
593 }
594
595 /// Get the size of a value without loading it.
596 ///
597 /// Useful for checking value size before loading large values from disk.
598 ///
599 /// # Arguments
600 ///
601 /// * `key` - The key to check
602 ///
603 /// # Returns
604 ///
605 /// Returns the size in bytes of the value.
606 ///
607 /// # Errors
608 ///
609 /// * `KeyNotFound` - Key does not exist
610 ///
611 /// # Example
612 ///
613 /// ```rust
614 /// # use feoxdb::FeoxStore;
615 /// # fn main() -> feoxdb::Result<()> {
616 /// # let store = FeoxStore::new(None)?;
617 /// store.insert(b"large_file", &vec![0u8; 1_000_000])?;
618 ///
619 /// // Check size before loading
620 /// let size = store.get_size(b"large_file")?;
621 /// assert_eq!(size, 1_000_000);
622 /// # Ok(())
623 /// # }
624 /// ```
625 pub fn get_size(&self, key: &[u8]) -> Result<usize> {
626 self.validate_key(key)?;
627
628 let record = self
629 .hash_table
630 .read(key, |_, v| v.clone())
631 .ok_or(FeoxError::KeyNotFound)?;
632
633 Ok(record.value_len)
634 }
635
636 // Internal helper methods
637
638 pub(super) fn validate_key_value(&self, key: &[u8], value: &[u8]) -> Result<()> {
639 if key.is_empty() || key.len() > MAX_KEY_SIZE {
640 return Err(FeoxError::InvalidKeySize);
641 }
642
643 if value.is_empty() || value.len() > MAX_VALUE_SIZE {
644 return Err(FeoxError::InvalidValueSize);
645 }
646
647 Ok(())
648 }
649
650 pub(super) fn validate_key(&self, key: &[u8]) -> Result<()> {
651 if key.is_empty() || key.len() > MAX_KEY_SIZE {
652 return Err(FeoxError::InvalidKeySize);
653 }
654
655 Ok(())
656 }
657
658 pub(super) fn check_memory_limit(&self, size: usize) -> bool {
659 match self.max_memory {
660 Some(limit) => {
661 let current = self.stats.memory_usage.load(Ordering::Acquire);
662 current + size <= limit
663 }
664 None => true,
665 }
666 }
667
668 pub(super) fn calculate_record_size(&self, key_len: usize, value_len: usize) -> usize {
669 std::mem::size_of::<Record>() + key_len + value_len
670 }
671
672 pub(super) fn get_timestamp(&self) -> u64 {
673 self.get_timestamp_pub()
674 }
675}