feoxdb/core/store/
operations.rs

1use bytes::Bytes;
2use std::sync::atomic::Ordering;
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5
6use crate::constants::*;
7use crate::core::record::Record;
8use crate::error::{FeoxError, Result};
9
10use super::FeoxStore;
11
12impl FeoxStore {
13    /// Insert or update a key-value pair.
14    ///
15    /// If the key already exists with a TTL, the TTL is removed (key becomes permanent).
16    /// To preserve or set TTL, use `insert_with_ttl()` instead.
17    ///
18    /// # Arguments
19    ///
20    /// * `key` - The key to insert
21    /// * `value` - The value to store
22    /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
23    ///
24    /// # Returns
25    ///
26    /// Returns `Ok(true)` if a new key was inserted, `Ok(false)` if an existing key was updated.
27    ///
28    /// # Errors
29    ///
30    /// * `InvalidKey` - Key is empty or too large
31    /// * `InvalidValue` - Value is too large
32    /// * `OlderTimestamp` - Timestamp is not newer than existing record
33    /// * `OutOfMemory` - Memory limit exceeded
34    ///
35    /// # Example
36    ///
37    /// ```rust
38    /// # use feoxdb::FeoxStore;
39    /// # fn main() -> feoxdb::Result<()> {
40    /// # let store = FeoxStore::new(None)?;
41    /// store.insert(b"user:123", b"{\"name\":\"Mehran\"}")?;
42    /// # Ok(())
43    /// # }
44    /// ```
45    ///
46    /// # Performance
47    ///
48    /// * Memory mode: ~600ns
49    /// * Persistent mode: ~800ns (buffered write)
50    pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<bool> {
51        self.insert_with_timestamp(key, value, None)
52    }
53
54    /// Insert or update a key-value pair with explicit timestamp.
55    ///
56    /// This is the advanced version that allows manual timestamp control for
57    /// conflict resolution. Most users should use `insert()` instead.
58    ///
59    /// # Arguments
60    ///
61    /// * `key` - The key to insert
62    /// * `value` - The value to store
63    /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
64    ///
65    /// # Errors
66    ///
67    /// * `OlderTimestamp` - Timestamp is not newer than existing record
68    pub fn insert_with_timestamp(
69        &self,
70        key: &[u8],
71        value: &[u8],
72        timestamp: Option<u64>,
73    ) -> Result<bool> {
74        self.insert_with_timestamp_and_ttl_internal(key, value, timestamp, 0)
75    }
76
77    /// Insert or update a key-value pair using zero-copy Bytes.
78    ///
79    /// This method avoids copying the value data by directly using the Bytes type,
80    /// which provides reference-counted zero-copy semantics. Useful when inserting
81    /// data that was already read from network or disk as Bytes.
82    ///
83    /// If the key already exists with a TTL, the TTL is removed (key becomes permanent).
84    /// To preserve or set TTL, use `insert_bytes_with_ttl()` instead.
85    ///
86    /// # Arguments
87    ///
88    /// * `key` - The key to insert
89    /// * `value` - The value to store as Bytes
90    ///
91    /// # Returns
92    ///
93    /// Returns `Ok(true)` if a new key was inserted, `Ok(false)` if an existing key was updated.
94    ///
95    /// # Errors
96    ///
97    /// * `InvalidKey` - Key is empty or too large
98    /// * `InvalidValue` - Value is too large
99    /// * `OlderTimestamp` - Timestamp is not newer than existing record
100    /// * `OutOfMemory` - Memory limit exceeded
101    ///
102    /// # Example
103    ///
104    /// ```rust
105    /// # use feoxdb::FeoxStore;
106    /// # use bytes::Bytes;
107    /// # fn main() -> feoxdb::Result<()> {
108    /// # let store = FeoxStore::new(None)?;
109    /// let data = Bytes::from_static(b"{\"name\":\"Mehran\"}");
110    /// store.insert_bytes(b"user:123", data)?;
111    /// # Ok(())
112    /// # }
113    /// ```
114    ///
115    /// # Performance
116    ///
117    /// * Memory mode: ~600ns (avoids value copy)
118    /// * Persistent mode: ~800ns (buffered write, avoids value copy)
119    pub fn insert_bytes(&self, key: &[u8], value: Bytes) -> Result<bool> {
120        self.insert_bytes_with_timestamp(key, value, None)
121    }
122
123    /// Insert or update a key-value pair using zero-copy Bytes with explicit timestamp.
124    ///
125    /// This is the advanced version that allows manual timestamp control for
126    /// conflict resolution. Most users should use `insert_bytes()` instead.
127    ///
128    /// # Arguments
129    ///
130    /// * `key` - The key to insert
131    /// * `value` - The value to store as Bytes
132    /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
133    ///
134    /// # Errors
135    ///
136    /// * `OlderTimestamp` - Timestamp is not newer than existing record
137    pub fn insert_bytes_with_timestamp(
138        &self,
139        key: &[u8],
140        value: Bytes,
141        timestamp: Option<u64>,
142    ) -> Result<bool> {
143        self.insert_bytes_with_timestamp_and_ttl_internal(key, value, timestamp, 0)
144    }
145
146    pub(super) fn insert_with_timestamp_and_ttl_internal(
147        &self,
148        key: &[u8],
149        value: &[u8],
150        timestamp: Option<u64>,
151        ttl_expiry: u64,
152    ) -> Result<bool> {
153        let start = std::time::Instant::now();
154        let timestamp = match timestamp {
155            Some(0) | None => self.get_timestamp(),
156            Some(ts) => ts,
157        };
158        self.validate_key_value(key, value)?;
159
160        // Check for existing record
161        let is_update = self.hash_table.contains(key);
162        let existing_record = self.hash_table.read(key, |_, v| v.clone());
163        if let Some(existing_record) = existing_record {
164            let existing_ts = existing_record.timestamp;
165            let existing_clone = existing_record;
166
167            if timestamp < existing_ts {
168                return Err(FeoxError::OlderTimestamp);
169            }
170
171            // Update existing record
172            return self.update_record_with_ttl(&existing_clone, value, timestamp, ttl_expiry);
173        }
174
175        let record_size = self.calculate_record_size(key.len(), value.len());
176        if !self.check_memory_limit(record_size) {
177            return Err(FeoxError::OutOfMemory);
178        }
179
180        // Create new record with TTL if specified and TTL is enabled
181        let record = if ttl_expiry > 0 && self.enable_ttl {
182            self.stats.keys_with_ttl.fetch_add(1, Ordering::Relaxed);
183            Arc::new(Record::new_with_timestamp_ttl(
184                key.to_vec(),
185                value.to_vec(),
186                timestamp,
187                ttl_expiry,
188            ))
189        } else {
190            Arc::new(Record::new(key.to_vec(), value.to_vec(), timestamp))
191        };
192
193        let key_vec = record.key.clone();
194
195        // Insert into hash table
196        self.hash_table.upsert(key_vec.clone(), Arc::clone(&record));
197
198        // Insert into lock-free skip list for ordered access
199        self.tree.insert(key_vec, Arc::clone(&record));
200
201        // Update statistics
202        self.stats.record_count.fetch_add(1, Ordering::AcqRel);
203        self.stats
204            .memory_usage
205            .fetch_add(record_size, Ordering::AcqRel);
206        self.stats
207            .record_insert(start.elapsed().as_nanos() as u64, is_update);
208
209        // Only do persistence if not in memory-only mode
210        if !self.memory_only {
211            // Queue for persistence if write buffer exists
212            if let Some(ref wb) = self.write_buffer {
213                if let Err(_e) = wb.add_write(Operation::Insert, record, 0) {
214                    // Don't fail the insert - data is still in memory
215                    // Return code already indicates success since data is in memory
216                }
217            }
218        }
219
220        Ok(!is_update)
221    }
222
223    /// Internal method to insert a Bytes value with timestamp and TTL (zero-copy)
224    pub(super) fn insert_bytes_with_timestamp_and_ttl_internal(
225        &self,
226        key: &[u8],
227        value: Bytes,
228        timestamp: Option<u64>,
229        ttl_seconds: u64,
230    ) -> Result<bool> {
231        let start = std::time::Instant::now();
232        // Get timestamp before any operations
233        let timestamp = match timestamp {
234            Some(0) | None => self.get_timestamp(),
235            Some(ts) => ts,
236        };
237
238        self.validate_key(key)?;
239        let value_len = value.len();
240        if value_len == 0 || value_len > MAX_VALUE_SIZE {
241            return Err(FeoxError::InvalidValueSize);
242        }
243
244        // Check for existing record
245        let is_update = self.hash_table.contains(key);
246        let existing_record = self.hash_table.read(key, |_, v| v.clone());
247        if let Some(existing_record) = existing_record {
248            let existing_ts = existing_record.timestamp;
249
250            if timestamp < existing_ts {
251                return Err(FeoxError::OlderTimestamp);
252            }
253
254            // Calculate TTL expiry
255            let ttl_expiry = if ttl_seconds > 0 && self.enable_ttl {
256                timestamp + (ttl_seconds * 1_000_000_000)
257            } else {
258                0
259            };
260
261            // Update existing record using the Bytes version
262            return self.update_record_with_ttl_bytes(
263                &existing_record,
264                value,
265                timestamp,
266                ttl_expiry,
267            );
268        }
269
270        // This point is only reached for new inserts (not updates)
271        let new_size = self.calculate_record_size(key.len(), value_len);
272        if !self.check_memory_limit(new_size) {
273            return Err(FeoxError::OutOfMemory);
274        }
275
276        // Create new record with Bytes value
277        let record = if ttl_seconds > 0 && self.enable_ttl {
278            let ttl_expiry = timestamp + (ttl_seconds * 1_000_000_000);
279            self.stats.keys_with_ttl.fetch_add(1, Ordering::Relaxed);
280            Arc::new(Record::new_from_bytes_with_ttl(
281                key.to_vec(),
282                value,
283                timestamp,
284                ttl_expiry,
285            ))
286        } else {
287            Arc::new(Record::new_from_bytes(key.to_vec(), value, timestamp))
288        };
289
290        let key_vec = record.key.clone();
291
292        // Insert into hash table
293        self.hash_table.upsert(key_vec.clone(), Arc::clone(&record));
294
295        // Insert into skip list for ordered access
296        self.tree.insert(key_vec, Arc::clone(&record));
297
298        // Update statistics
299        self.stats.record_count.fetch_add(1, Ordering::AcqRel);
300        self.stats
301            .memory_usage
302            .fetch_add(new_size, Ordering::AcqRel);
303        self.stats
304            .record_insert(start.elapsed().as_nanos() as u64, is_update);
305
306        // Only do persistence if not in memory-only mode
307        if !self.memory_only {
308            // Queue for persistence if write buffer exists
309            if let Some(ref wb) = self.write_buffer {
310                if let Err(_e) = wb.add_write(Operation::Insert, record, 0) {
311                    // Don't fail the insert - data is still in memory
312                }
313            }
314        }
315
316        Ok(!is_update)
317    }
318
319    /// Retrieve a value by key.
320    ///
321    /// # Arguments
322    ///
323    /// * `key` - The key to look up
324    /// * `expected_size` - Optional expected value size for validation
325    ///
326    /// # Returns
327    ///
328    /// Returns the value as a `Vec<u8>` if found.
329    ///
330    /// # Errors
331    ///
332    /// * `KeyNotFound` - Key does not exist
333    /// * `InvalidKey` - Key is invalid
334    /// * `SizeMismatch` - Value size doesn't match expected size
335    /// * `IoError` - Failed to read from disk (persistent mode)
336    ///
337    /// # Example
338    ///
339    /// ```rust
340    /// # use feoxdb::FeoxStore;
341    /// # fn main() -> feoxdb::Result<()> {
342    /// # let store = FeoxStore::new(None)?;
343    /// # store.insert(b"key", b"value")?;
344    /// let value = store.get(b"key")?;
345    /// assert_eq!(value, b"value");
346    /// # Ok(())
347    /// # }
348    /// ```
349    ///
350    /// # Performance
351    ///
352    /// * Memory mode: ~100ns
353    /// * Persistent mode (cached): ~150ns
354    /// * Persistent mode (disk read): ~500ns
355    pub fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
356        let start = std::time::Instant::now();
357        self.validate_key(key)?;
358
359        let mut cache_hit = false;
360        if self.enable_caching {
361            if let Some(ref cache) = self.cache {
362                if let Some(value) = cache.get(key) {
363                    self.stats
364                        .record_get(start.elapsed().as_nanos() as u64, true);
365                    return Ok(value.to_vec());
366                }
367            }
368        }
369
370        let record = self
371            .hash_table
372            .read(key, |_, v| v.clone())
373            .ok_or(FeoxError::KeyNotFound)?;
374
375        // Check TTL expiry if TTL is enabled
376        if self.enable_ttl {
377            let ttl_expiry = record.ttl_expiry.load(Ordering::Relaxed);
378            if ttl_expiry > 0 {
379                let now = SystemTime::now()
380                    .duration_since(UNIX_EPOCH)
381                    .unwrap_or_default()
382                    .as_nanos() as u64;
383                if now > ttl_expiry {
384                    self.stats.ttl_expired_lazy.fetch_add(1, Ordering::Relaxed);
385                    return Err(FeoxError::KeyNotFound);
386                }
387            }
388        }
389
390        let value = if let Some(val) = record.get_value() {
391            val.to_vec()
392        } else {
393            cache_hit = false; // Reading from disk
394            self.load_value_from_disk(&record)?
395        };
396
397        if self.enable_caching {
398            if let Some(ref cache) = self.cache {
399                cache.insert(key.to_vec(), Bytes::from(value.clone()));
400            }
401        }
402
403        self.stats
404            .record_get(start.elapsed().as_nanos() as u64, cache_hit);
405        Ok(value)
406    }
407
408    /// Get a value by key without copying (zero-copy).
409    ///
410    /// Returns `Bytes` which avoids the memory copy that `get()` performs
411    /// when converting to `Vec<u8>`.
412    ///
413    /// # Arguments
414    ///
415    /// * `key` - The key to look up
416    ///
417    /// # Returns
418    ///
419    /// Returns the value as `Bytes` if found.
420    ///
421    /// # Example
422    ///
423    /// ```rust
424    /// # use feoxdb::FeoxStore;
425    /// # fn main() -> feoxdb::Result<()> {
426    /// # let store = FeoxStore::new(None)?;
427    /// # store.insert(b"key", b"value")?;
428    /// let bytes = store.get_bytes(b"key")?;
429    /// // Use bytes directly without copying
430    /// assert_eq!(&bytes[..], b"value");
431    /// # Ok(())
432    /// # }
433    /// ```
434    ///
435    /// # Performance
436    ///
437    /// Significantly faster than `get()` for large values:
438    /// * 100 bytes: ~15% faster
439    /// * 1KB: ~50% faster  
440    /// * 10KB: ~90% faster
441    /// * 100KB: ~95% faster
442    pub fn get_bytes(&self, key: &[u8]) -> Result<Bytes> {
443        let start = std::time::Instant::now();
444        self.validate_key(key)?;
445
446        if self.enable_caching {
447            if let Some(ref cache) = self.cache {
448                if let Some(value) = cache.get(key) {
449                    self.stats
450                        .record_get(start.elapsed().as_nanos() as u64, true);
451                    return Ok(value);
452                }
453            }
454        }
455
456        let record = self
457            .hash_table
458            .read(key, |_, v| v.clone())
459            .ok_or(FeoxError::KeyNotFound)?;
460
461        // Check TTL expiry if TTL is enabled
462        if self.enable_ttl {
463            let ttl_expiry = record.ttl_expiry.load(Ordering::Relaxed);
464            if ttl_expiry > 0 {
465                let now = SystemTime::now()
466                    .duration_since(UNIX_EPOCH)
467                    .unwrap_or_default()
468                    .as_nanos() as u64;
469                if now > ttl_expiry {
470                    self.stats.ttl_expired_lazy.fetch_add(1, Ordering::Relaxed);
471                    return Err(FeoxError::KeyNotFound);
472                }
473            }
474        }
475
476        let (value, cache_hit) = if let Some(val) = record.get_value() {
477            (val, true)
478        } else {
479            (Bytes::from(self.load_value_from_disk(&record)?), false)
480        };
481
482        if self.enable_caching {
483            if let Some(ref cache) = self.cache {
484                cache.insert(key.to_vec(), value.clone());
485            }
486        }
487
488        self.stats
489            .record_get(start.elapsed().as_nanos() as u64, cache_hit);
490        Ok(value)
491    }
492
493    /// Delete a key-value pair.
494    ///
495    /// # Arguments
496    ///
497    /// * `key` - The key to delete
498    /// * `timestamp` - Optional timestamp for conflict resolution
499    ///
500    /// # Returns
501    ///
502    /// Returns `Ok(())` if the key was deleted.
503    ///
504    /// # Errors
505    ///
506    /// * `KeyNotFound` - Key does not exist
507    /// * `OlderTimestamp` - Timestamp is not newer than existing record
508    ///
509    /// # Example
510    ///
511    /// ```rust
512    /// # use feoxdb::FeoxStore;
513    /// # fn main() -> feoxdb::Result<()> {
514    /// # let store = FeoxStore::new(None)?;
515    /// # store.insert(b"temp", b"data")?;
516    /// store.delete(b"temp")?;
517    /// # Ok(())
518    /// # }
519    /// ```
520    ///
521    /// # Performance
522    ///
523    /// * Memory mode: ~300ns
524    /// * Persistent mode: ~400ns
525    pub fn delete(&self, key: &[u8]) -> Result<()> {
526        self.delete_with_timestamp(key, None)
527    }
528
529    /// Delete a key-value pair with explicit timestamp.
530    ///
531    /// This is the advanced version that allows manual timestamp control.
532    /// Most users should use `delete()` instead.
533    ///
534    /// # Arguments
535    ///
536    /// * `key` - The key to delete
537    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
538    ///
539    /// # Errors
540    ///
541    /// * `OlderTimestamp` - Timestamp is not newer than existing record
542    pub fn delete_with_timestamp(&self, key: &[u8], timestamp: Option<u64>) -> Result<()> {
543        let start = std::time::Instant::now();
544        let timestamp = match timestamp {
545            Some(0) | None => self.get_timestamp(),
546            Some(ts) => ts,
547        };
548        self.validate_key(key)?;
549
550        // Remove from hash table and get the record
551        let record_pair = self.hash_table.remove(key).ok_or(FeoxError::KeyNotFound)?;
552        let record = record_pair.1;
553
554        if timestamp < record.timestamp {
555            // Put it back if timestamp is older
556            self.hash_table.upsert(key.to_vec(), record);
557            return Err(FeoxError::OlderTimestamp);
558        }
559
560        let record_size = record.calculate_size();
561        let old_value_len = record.value_len;
562
563        // Mark record as deleted by setting refcount to 0
564        record.refcount.store(0, Ordering::Release);
565
566        // Remove from lock-free skip list
567        self.tree.remove(key);
568
569        // Update statistics
570        self.stats.record_count.fetch_sub(1, Ordering::AcqRel);
571        self.stats
572            .memory_usage
573            .fetch_sub(record_size, Ordering::AcqRel);
574
575        // Clear from cache
576        if self.enable_caching {
577            if let Some(ref cache) = self.cache {
578                cache.remove(key);
579            }
580        }
581
582        // Queue deletion for persistence if write buffer exists and not memory-only
583        if !self.memory_only {
584            if let Some(ref wb) = self.write_buffer {
585                if let Err(_e) = wb.add_write(Operation::Delete, record, old_value_len) {
586                    // Silent failure - data operation succeeded in memory
587                }
588            }
589        }
590
591        self.stats.record_delete(start.elapsed().as_nanos() as u64);
592        Ok(())
593    }
594
595    /// Get the size of a value without loading it.
596    ///
597    /// Useful for checking value size before loading large values from disk.
598    ///
599    /// # Arguments
600    ///
601    /// * `key` - The key to check
602    ///
603    /// # Returns
604    ///
605    /// Returns the size in bytes of the value.
606    ///
607    /// # Errors
608    ///
609    /// * `KeyNotFound` - Key does not exist
610    ///
611    /// # Example
612    ///
613    /// ```rust
614    /// # use feoxdb::FeoxStore;
615    /// # fn main() -> feoxdb::Result<()> {
616    /// # let store = FeoxStore::new(None)?;
617    /// store.insert(b"large_file", &vec![0u8; 1_000_000])?;
618    ///
619    /// // Check size before loading
620    /// let size = store.get_size(b"large_file")?;
621    /// assert_eq!(size, 1_000_000);
622    /// # Ok(())
623    /// # }
624    /// ```
625    pub fn get_size(&self, key: &[u8]) -> Result<usize> {
626        self.validate_key(key)?;
627
628        let record = self
629            .hash_table
630            .read(key, |_, v| v.clone())
631            .ok_or(FeoxError::KeyNotFound)?;
632
633        Ok(record.value_len)
634    }
635
636    // Internal helper methods
637
638    pub(super) fn validate_key_value(&self, key: &[u8], value: &[u8]) -> Result<()> {
639        if key.is_empty() || key.len() > MAX_KEY_SIZE {
640            return Err(FeoxError::InvalidKeySize);
641        }
642
643        if value.is_empty() || value.len() > MAX_VALUE_SIZE {
644            return Err(FeoxError::InvalidValueSize);
645        }
646
647        Ok(())
648    }
649
650    pub(super) fn validate_key(&self, key: &[u8]) -> Result<()> {
651        if key.is_empty() || key.len() > MAX_KEY_SIZE {
652            return Err(FeoxError::InvalidKeySize);
653        }
654
655        Ok(())
656    }
657
658    pub(super) fn check_memory_limit(&self, size: usize) -> bool {
659        match self.max_memory {
660            Some(limit) => {
661                let current = self.stats.memory_usage.load(Ordering::Acquire);
662                current + size <= limit
663            }
664            None => true,
665        }
666    }
667
668    pub(super) fn calculate_record_size(&self, key_len: usize, value_len: usize) -> usize {
669        std::mem::size_of::<Record>() + key_len + value_len
670    }
671
672    pub(super) fn get_timestamp(&self) -> u64 {
673        self.get_timestamp_pub()
674    }
675}