feoxdb/core/
store.rs

1use bytes::Bytes;
2use crossbeam_skiplist::SkipMap;
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use std::io;
6use std::sync::atomic::Ordering;
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10pub use crate::constants::Operation;
11use crate::constants::*;
12use crate::core::record::Record;
13use crate::error::{FeoxError, Result};
14use crate::stats::Statistics;
15use crate::storage::free_space::FreeSpaceManager;
16use crate::storage::metadata::Metadata;
17use crate::storage::write_buffer::WriteBuffer;
18
19/// High-performance embedded key-value store.
20///
21/// `FeoxStore` provides ultra-fast key-value storage with optional persistence.
22/// It uses lock-free data structures for concurrent access and achieves
23/// sub-microsecond latencies for most operations.
24///
25/// # Thread Safety
26///
27/// All methods are thread-safe and can be called concurrently from multiple threads.
28pub struct FeoxStore {
29    // Main hash table with lock-free operations
30    hash_table: DashMap<Vec<u8>, Arc<Record>>,
31
32    // Lock-free skip list for ordered access
33    tree: Arc<SkipMap<Vec<u8>, Arc<Record>>>,
34
35    // Central statistics hub
36    stats: Arc<Statistics>,
37
38    // Write buffering (optional for memory-only mode)
39    write_buffer: Option<Arc<WriteBuffer>>,
40
41    // Free space management
42    free_space: Arc<RwLock<FreeSpaceManager>>,
43
44    // Metadata
45    _metadata: Arc<RwLock<Metadata>>,
46
47    // Configuration
48    memory_only: bool,
49    enable_caching: bool,
50    max_memory: Option<usize>,
51
52    // Cache (if enabled)
53    cache: Option<Arc<super::cache::ClockCache>>,
54    #[cfg(unix)]
55    device_fd: Option<i32>,
56    device_size: u64,
57    device_file: Option<std::fs::File>,
58
59    // Disk I/O
60    disk_io: Option<Arc<RwLock<crate::storage::io::DiskIO>>>,
61}
62
63/// Configuration options for FeoxStore.
64///
65/// Use `StoreBuilder` for a more ergonomic way to configure the store.
66pub struct StoreConfig {
67    pub hash_bits: u32,
68    pub memory_only: bool,
69    pub enable_caching: bool,
70    pub device_path: Option<String>,
71    pub max_memory: Option<usize>,
72}
73
74/// Builder for creating FeoxStore with custom configuration.
75///
76/// Provides a fluent interface for configuring store parameters.
77///
78/// # Example
79///
80/// ```rust
81/// use feoxdb::FeoxStore;
82///
83/// # fn main() -> feoxdb::Result<()> {
84/// let store = FeoxStore::builder()
85///     .max_memory(1_000_000_000)
86///     .hash_bits(20)
87///     .build()?;
88/// # Ok(())
89/// # }
90/// ```
91pub struct StoreBuilder {
92    hash_bits: u32,
93    device_path: Option<String>,
94    max_memory: Option<usize>,
95    enable_caching: Option<bool>,
96}
97
98impl StoreBuilder {
99    pub fn new() -> Self {
100        Self {
101            hash_bits: DEFAULT_HASH_BITS,
102            device_path: None,
103            max_memory: Some(DEFAULT_MAX_MEMORY),
104            enable_caching: None, // Auto-detect based on storage mode
105        }
106    }
107
108    /// Set the device path for persistent storage.
109    ///
110    /// When set, data will be persisted to disk asynchronously.
111    /// If not set, the store operates in memory-only mode.
112    pub fn device_path(mut self, path: impl Into<String>) -> Self {
113        self.device_path = Some(path.into());
114        self
115    }
116
117    /// Set the maximum memory limit (in bytes).
118    ///
119    /// The store will start evicting entries when this limit is approached.
120    /// Default: 1GB
121    pub fn max_memory(mut self, limit: usize) -> Self {
122        self.max_memory = Some(limit);
123        self
124    }
125
126    /// Remove memory limit.
127    ///
128    /// Use with caution as the store can grow unbounded.
129    pub fn no_memory_limit(mut self) -> Self {
130        self.max_memory = None;
131        self
132    }
133
134    /// Set number of hash bits (determines hash table size).
135    ///
136    /// More bits = larger hash table = better performance for large datasets.
137    /// Default: 18 (256K buckets)
138    pub fn hash_bits(mut self, bits: u32) -> Self {
139        self.hash_bits = bits;
140        self
141    }
142
143    /// Enable or disable caching.
144    ///
145    /// When enabled, frequently accessed values are kept in memory
146    /// even after being written to disk. Uses CLOCK eviction algorithm.
147    pub fn enable_caching(mut self, enable: bool) -> Self {
148        self.enable_caching = Some(enable);
149        self
150    }
151
152    /// Build the FeoxStore
153    pub fn build(self) -> Result<FeoxStore> {
154        let memory_only = self.device_path.is_none();
155        let enable_caching = self.enable_caching.unwrap_or(!memory_only);
156
157        let config = StoreConfig {
158            hash_bits: self.hash_bits,
159            memory_only,
160            enable_caching,
161            device_path: self.device_path,
162            max_memory: self.max_memory,
163        };
164
165        FeoxStore::with_config(config)
166    }
167}
168
169impl Default for StoreBuilder {
170    fn default() -> Self {
171        Self::new()
172    }
173}
174
175impl FeoxStore {
176    /// Create a new FeoxStore with default configuration
177    pub fn new(device_path: Option<String>) -> Result<Self> {
178        let memory_only = device_path.is_none();
179        let config = StoreConfig {
180            hash_bits: DEFAULT_HASH_BITS,
181            memory_only,
182            enable_caching: !memory_only, // Disable caching for memory-only mode
183            device_path,
184            max_memory: Some(DEFAULT_MAX_MEMORY),
185        };
186        let hash_table = DashMap::with_capacity(1 << config.hash_bits);
187
188        let free_space = Arc::new(RwLock::new(FreeSpaceManager::new()));
189        let metadata = Arc::new(RwLock::new(Metadata::new()));
190        let stats = Arc::new(Statistics::new());
191
192        let cache = if config.enable_caching {
193            Some(Arc::new(super::cache::ClockCache::new(stats.clone())))
194        } else {
195            None
196        };
197
198        let mut store = Self {
199            hash_table,
200            tree: Arc::new(SkipMap::new()),
201            stats: stats.clone(),
202            write_buffer: None,
203            free_space: free_space.clone(),
204            _metadata: metadata,
205            memory_only: config.memory_only,
206            enable_caching: config.enable_caching,
207            max_memory: config.max_memory,
208            cache,
209            #[cfg(unix)]
210            device_fd: None,
211            device_size: 0,
212            device_file: None,
213            disk_io: None,
214        };
215
216        if !config.memory_only {
217            store.open_device(&config.device_path)?;
218            store.load_indexes()?;
219
220            // Initialize write buffer for persistent mode
221            if let Some(ref disk_io) = store.disk_io {
222                let mut write_buffer = WriteBuffer::new(disk_io.clone(), free_space, stats.clone());
223                let num_workers = (num_cpus::get() / 2).max(1);
224                write_buffer.start_workers(num_workers);
225                store.write_buffer = Some(Arc::new(write_buffer));
226            }
227        }
228
229        Ok(store)
230    }
231
232    /// Create a new FeoxStore with custom configuration
233    pub fn with_config(config: StoreConfig) -> Result<Self> {
234        // Initialize hash table with configured capacity
235        let hash_table = DashMap::with_capacity(1 << config.hash_bits);
236
237        let free_space = Arc::new(RwLock::new(FreeSpaceManager::new()));
238        let metadata = Arc::new(RwLock::new(Metadata::new()));
239        let stats = Arc::new(Statistics::new());
240
241        let cache = if config.enable_caching {
242            Some(Arc::new(super::cache::ClockCache::new(stats.clone())))
243        } else {
244            None
245        };
246
247        let mut store = Self {
248            hash_table,
249            tree: Arc::new(SkipMap::new()),
250            stats: stats.clone(),
251            write_buffer: None,
252            free_space: free_space.clone(),
253            _metadata: metadata,
254            memory_only: config.memory_only,
255            enable_caching: config.enable_caching,
256            max_memory: config.max_memory,
257            cache,
258            #[cfg(unix)]
259            device_fd: None,
260            device_size: 0,
261            device_file: None,
262            disk_io: None,
263        };
264
265        if !config.memory_only {
266            store.open_device(&config.device_path)?;
267            store.load_indexes()?;
268
269            // Initialize write buffer for persistent mode
270            if let Some(ref disk_io) = store.disk_io {
271                let mut write_buffer = WriteBuffer::new(disk_io.clone(), free_space, stats.clone());
272                let num_workers = (num_cpus::get() / 2).max(1);
273                write_buffer.start_workers(num_workers);
274                store.write_buffer = Some(Arc::new(write_buffer));
275            }
276        }
277
278        Ok(store)
279    }
280
281    /// Create a builder for configuring FeoxStore.
282    ///
283    /// # Example
284    ///
285    /// ```rust
286    /// use feoxdb::FeoxStore;
287    ///
288    /// # fn main() -> feoxdb::Result<()> {
289    /// let store = FeoxStore::builder()
290    ///     .max_memory(2_000_000_000)
291    ///     .build()?;
292    /// # Ok(())
293    /// # }
294    /// ```
295    pub fn builder() -> StoreBuilder {
296        StoreBuilder::new()
297    }
298
299    /// Insert or update a key-value pair.
300    ///
301    /// # Arguments
302    ///
303    /// * `key` - The key to insert (max 65KB)
304    /// * `value` - The value to store (max 4GB)
305    /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
306    ///
307    /// # Returns
308    ///
309    /// Returns `Ok(())` if successful.
310    ///
311    /// # Errors
312    ///
313    /// * `InvalidKey` - Key is empty or too large
314    /// * `InvalidValue` - Value is too large
315    /// * `OlderTimestamp` - Timestamp is not newer than existing record
316    /// * `OutOfMemory` - Memory limit exceeded
317    ///
318    /// # Example
319    ///
320    /// ```rust
321    /// # use feoxdb::FeoxStore;
322    /// # fn main() -> feoxdb::Result<()> {
323    /// # let store = FeoxStore::new(None)?;
324    /// store.insert(b"user:123", b"{\"name\":\"Mehran\"}")?;
325    /// # Ok(())
326    /// # }
327    /// ```
328    ///
329    /// # Performance
330    ///
331    /// * Memory mode: ~800ns
332    /// * Persistent mode: ~1µs (buffered write)
333    pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
334        self.insert_with_timestamp(key, value, None)
335    }
336
337    /// Insert or update a key-value pair with explicit timestamp.
338    ///
339    /// This is the advanced version that allows manual timestamp control for
340    /// conflict resolution. Most users should use `insert()` instead.
341    ///
342    /// # Arguments
343    ///
344    /// * `key` - The key to insert (max 65KB)
345    /// * `value` - The value to store (max 4GB)
346    /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
347    ///
348    /// # Errors
349    ///
350    /// * `OlderTimestamp` - Timestamp is not newer than existing record
351    pub fn insert_with_timestamp(
352        &self,
353        key: &[u8],
354        value: &[u8],
355        timestamp: Option<u64>,
356    ) -> Result<()> {
357        let start = std::time::Instant::now();
358        let timestamp = match timestamp {
359            Some(0) | None => self.get_timestamp(),
360            Some(ts) => ts,
361        };
362        self.validate_key_value(key, value)?;
363
364        // Check for existing record
365        let is_update = self.hash_table.contains_key(key);
366        if let Some(existing_record) = self.hash_table.get(key) {
367            let existing_ts = existing_record.timestamp;
368            let existing_clone = Arc::clone(&existing_record);
369            drop(existing_record); // Release the reference before updating
370
371            if timestamp < existing_ts {
372                return Err(FeoxError::OlderTimestamp);
373            }
374
375            // Update existing record
376            return self.update_record(&existing_clone, value, timestamp);
377        }
378
379        let record_size = self.calculate_record_size(key.len(), value.len());
380        if !self.check_memory_limit(record_size) {
381            return Err(FeoxError::OutOfMemory);
382        }
383
384        // Create new record
385        let record = Arc::new(Record::new(key.to_vec(), value.to_vec(), timestamp));
386
387        let key_vec = record.key.clone();
388
389        // Insert into hash table - DashMap handles locking internally
390        self.hash_table.insert(key_vec.clone(), Arc::clone(&record));
391
392        // Insert into lock-free skip list for ordered access
393        self.tree.insert(key_vec, Arc::clone(&record));
394
395        // Update statistics
396        self.stats.record_count.fetch_add(1, Ordering::AcqRel);
397        self.stats
398            .memory_usage
399            .fetch_add(record_size, Ordering::AcqRel);
400        self.stats
401            .record_insert(start.elapsed().as_nanos() as u64, is_update);
402
403        // Only do persistence and cache checks if not in memory-only mode
404        if !self.memory_only {
405            // Queue for persistence if write buffer exists
406            if let Some(ref wb) = self.write_buffer {
407                if let Err(_e) = wb.add_write(Operation::Insert, record, 0) {
408                    // Don't fail the insert - data is still in memory
409                    // Return code already indicates success since data is in memory
410                }
411            }
412
413            // Check memory pressure and trigger cache eviction if needed
414            if self.enable_caching {
415                if let Some(ref cache) = self.cache {
416                    let stats = cache.stats();
417                    if stats.memory_usage > stats.high_watermark {
418                        cache.evict_entries();
419                    }
420                }
421            }
422        }
423
424        Ok(())
425    }
426
427    /// Retrieve a value by key.
428    ///
429    /// # Arguments
430    ///
431    /// * `key` - The key to look up
432    /// * `expected_size` - Optional expected value size for validation
433    ///
434    /// # Returns
435    ///
436    /// Returns the value as a `Vec<u8>` if found.
437    ///
438    /// # Errors
439    ///
440    /// * `KeyNotFound` - Key does not exist
441    /// * `InvalidKey` - Key is invalid
442    /// * `SizeMismatch` - Value size doesn't match expected size
443    /// * `IoError` - Failed to read from disk (persistent mode)
444    ///
445    /// # Example
446    ///
447    /// ```rust
448    /// # use feoxdb::FeoxStore;
449    /// # fn main() -> feoxdb::Result<()> {
450    /// # let store = FeoxStore::new(None)?;
451    /// # store.insert(b"key", b"value")?;
452    /// let value = store.get(b"key")?;
453    /// assert_eq!(value, b"value");
454    /// # Ok(())
455    /// # }
456    /// ```
457    ///
458    /// # Performance
459    ///
460    /// * Memory mode: ~100ns
461    /// * Persistent mode (cached): ~150ns
462    /// * Persistent mode (disk read): ~500ns
463    pub fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
464        let start = std::time::Instant::now();
465        self.validate_key(key)?;
466
467        let mut cache_hit = false;
468        if self.enable_caching {
469            if let Some(ref cache) = self.cache {
470                if let Some(value) = cache.get(key) {
471                    self.stats
472                        .record_get(start.elapsed().as_nanos() as u64, true);
473                    return Ok(value.to_vec());
474                }
475            }
476        }
477
478        let record = self.hash_table.get(key).ok_or(FeoxError::KeyNotFound)?;
479
480        let value = if let Some(val) = record.get_value() {
481            val.to_vec()
482        } else {
483            cache_hit = false; // Reading from disk
484            self.load_value_from_disk(&record)?
485        };
486
487        if self.enable_caching {
488            if let Some(ref cache) = self.cache {
489                cache.insert(key.to_vec(), Bytes::from(value.clone()));
490            }
491        }
492
493        self.stats
494            .record_get(start.elapsed().as_nanos() as u64, cache_hit);
495        Ok(value)
496    }
497
498    /// Delete a key-value pair.
499    ///
500    /// # Arguments
501    ///
502    /// * `key` - The key to delete
503    /// * `timestamp` - Optional timestamp for conflict resolution
504    ///
505    /// # Returns
506    ///
507    /// Returns `Ok(())` if the key was deleted.
508    ///
509    /// # Errors
510    ///
511    /// * `KeyNotFound` - Key does not exist
512    /// * `OlderTimestamp` - Timestamp is not newer than existing record
513    ///
514    /// # Example
515    ///
516    /// ```rust
517    /// # use feoxdb::FeoxStore;
518    /// # fn main() -> feoxdb::Result<()> {
519    /// # let store = FeoxStore::new(None)?;
520    /// # store.insert(b"temp", b"data")?;
521    /// store.delete(b"temp")?;
522    /// # Ok(())
523    /// # }
524    /// ```
525    ///
526    /// # Performance
527    ///
528    /// * Memory mode: ~300ns
529    /// * Persistent mode: ~400ns
530    pub fn delete(&self, key: &[u8]) -> Result<()> {
531        self.delete_with_timestamp(key, None)
532    }
533
534    /// Delete a key-value pair with explicit timestamp.
535    ///
536    /// This is the advanced version that allows manual timestamp control.
537    /// Most users should use `delete()` instead.
538    ///
539    /// # Arguments
540    ///
541    /// * `key` - The key to delete
542    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
543    ///
544    /// # Errors
545    ///
546    /// * `OlderTimestamp` - Timestamp is not newer than existing record
547    pub fn delete_with_timestamp(&self, key: &[u8], timestamp: Option<u64>) -> Result<()> {
548        let start = std::time::Instant::now();
549        let timestamp = match timestamp {
550            Some(0) | None => self.get_timestamp(),
551            Some(ts) => ts,
552        };
553        self.validate_key(key)?;
554
555        // Remove from hash table and get the record
556        let (_key, record) = self.hash_table.remove(key).ok_or(FeoxError::KeyNotFound)?;
557
558        if timestamp < record.timestamp {
559            // Put it back if timestamp is older
560            self.hash_table.insert(key.to_vec(), record);
561            return Err(FeoxError::OlderTimestamp);
562        }
563
564        let record_size = record.calculate_size();
565        let old_value_len = record.value_len;
566
567        // Mark record as deleted by setting refcount to 0
568        record.refcount.store(0, Ordering::Release);
569
570        // Remove from lock-free skip list
571        self.tree.remove(key);
572
573        // Update statistics
574        self.stats.record_count.fetch_sub(1, Ordering::AcqRel);
575        self.stats
576            .memory_usage
577            .fetch_sub(record_size, Ordering::AcqRel);
578
579        // Clear from cache
580        if self.enable_caching {
581            if let Some(ref cache) = self.cache {
582                cache.remove(key);
583            }
584        }
585
586        // Queue deletion for persistence if write buffer exists and not memory-only
587        if !self.memory_only {
588            if let Some(ref wb) = self.write_buffer {
589                if let Err(_e) = wb.add_write(Operation::Delete, record, old_value_len) {
590                    // Silent failure - data operation succeeded in memory
591                }
592            }
593        }
594
595        self.stats.record_delete(start.elapsed().as_nanos() as u64);
596        Ok(())
597    }
598
599    /// Apply a JSON patch to a value.
600    ///
601    /// Uses RFC 6902 JSON Patch format to modify specific fields in a JSON document.
602    /// Both the existing value and the patch must be valid JSON.
603    ///
604    /// # Arguments
605    ///
606    /// * `key` - The key containing the JSON document to patch
607    /// * `patch` - JSON Patch operations in RFC 6902 format
608    /// * `timestamp` - Optional timestamp for conflict resolution
609    ///
610    /// # Returns
611    ///
612    /// Returns `Ok(())` if the update was applied.
613    ///
614    /// # Errors
615    ///
616    /// * `KeyNotFound` - Key does not exist
617    /// * `OlderTimestamp` - Timestamp is not newer than existing record
618    /// * `JsonPatchError` - Invalid JSON document or patch format
619    ///
620    /// # Example
621    ///
622    /// ```no_run
623    /// # use feoxdb::FeoxStore;
624    /// # fn main() -> feoxdb::Result<()> {
625    /// # let store = FeoxStore::new(None)?;
626    /// // Insert initial JSON value
627    /// let initial = br#"{"name":"Alice","age":30}"#;
628    /// store.insert(b"user:1", initial)?;
629    ///
630    /// // Apply JSON patch to update age
631    /// let patch = br#"[{"op":"replace","path":"/age","value":31}]"#;
632    /// store.json_patch(b"user:1", patch)?;
633    ///
634    /// // Value now has age updated to 31
635    /// let updated = store.get(b"user:1")?;
636    /// assert_eq!(updated.len(), initial.len()); // Same length, just age changed
637    /// # Ok(())
638    /// # }
639    /// ```
640    pub fn json_patch(&self, key: &[u8], patch: &[u8]) -> Result<()> {
641        self.json_patch_with_timestamp(key, patch, None)
642    }
643
644    /// Apply JSON patch with explicit timestamp.
645    ///
646    /// This is the advanced version that allows manual timestamp control.
647    /// Most users should use `json_patch()` instead.
648    ///
649    /// # Arguments
650    ///
651    /// * `key` - The key whose value to patch
652    /// * `patch` - JSON Patch array (RFC 6902)
653    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
654    ///
655    /// # Errors
656    ///
657    /// * `OlderTimestamp` - Timestamp is not newer than existing record
658    pub fn json_patch_with_timestamp(
659        &self,
660        key: &[u8],
661        patch: &[u8],
662        timestamp: Option<u64>,
663    ) -> Result<()> {
664        let timestamp = match timestamp {
665            Some(0) | None => self.get_timestamp(),
666            Some(ts) => ts,
667        };
668        self.validate_key(key)?;
669
670        // Get the value and release the lock immediately
671        let current_value = {
672            let record = self.hash_table.get(key).ok_or(FeoxError::KeyNotFound)?;
673
674            if timestamp < record.timestamp {
675                return Err(FeoxError::OlderTimestamp);
676            }
677
678            if let Some(val) = record.get_value() {
679                val.to_vec()
680            } else {
681                self.load_value_from_disk(&record)?
682            }
683        };
684
685        let new_value = crate::utils::json_patch::apply_json_patch(&current_value, patch)?;
686
687        // Now update without holding any references
688        self.insert_with_timestamp(key, &new_value, Some(timestamp))
689    }
690
691    fn update_record(&self, old_record: &Record, value: &[u8], timestamp: u64) -> Result<()> {
692        let new_record = Arc::new(Record::new(
693            old_record.key.clone(),
694            value.to_vec(),
695            timestamp,
696        ));
697
698        let old_value_len = old_record.value_len;
699        let old_size = old_record.calculate_size();
700        let new_size = self.calculate_record_size(old_record.key.len(), value.len());
701
702        let old_record_arc = if let Some(entry) = self.hash_table.get(&old_record.key) {
703            Arc::clone(&*entry)
704        } else {
705            return Err(FeoxError::KeyNotFound);
706        };
707
708        let key_vec = new_record.key.clone();
709
710        self.hash_table
711            .insert(key_vec.clone(), Arc::clone(&new_record));
712        self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
713
714        if new_size > old_size {
715            self.stats
716                .memory_usage
717                .fetch_add(new_size - old_size, Ordering::AcqRel);
718        } else {
719            self.stats
720                .memory_usage
721                .fetch_sub(old_size - new_size, Ordering::AcqRel);
722        }
723
724        // Only do cache and persistence operations if not in memory-only mode
725        if !self.memory_only {
726            if self.enable_caching {
727                if let Some(ref cache) = self.cache {
728                    cache.remove(&key_vec);
729                }
730            }
731
732            if let Some(ref wb) = self.write_buffer {
733                if let Err(e) =
734                    wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
735                {
736                    // Data operation succeeded in memory
737                    let _ = e;
738                }
739
740                if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
741                    // Data operation succeeded in memory
742                    let _ = e;
743                }
744            }
745        }
746
747        Ok(())
748    }
749
750    fn validate_key_value(&self, key: &[u8], value: &[u8]) -> Result<()> {
751        if key.is_empty() || key.len() > MAX_KEY_SIZE {
752            return Err(FeoxError::InvalidKeySize);
753        }
754
755        if value.is_empty() || value.len() > MAX_VALUE_SIZE {
756            return Err(FeoxError::InvalidValueSize);
757        }
758
759        Ok(())
760    }
761
762    fn validate_key(&self, key: &[u8]) -> Result<()> {
763        if key.is_empty() || key.len() > MAX_KEY_SIZE {
764            return Err(FeoxError::InvalidKeySize);
765        }
766
767        Ok(())
768    }
769
770    fn check_memory_limit(&self, size: usize) -> bool {
771        match self.max_memory {
772            Some(limit) => {
773                let current = self.stats.memory_usage.load(Ordering::Acquire);
774                current + size <= limit
775            }
776            None => true,
777        }
778    }
779
780    fn calculate_record_size(&self, key_len: usize, value_len: usize) -> usize {
781        std::mem::size_of::<Record>() + key_len + value_len
782    }
783
784    /// Perform a range query on the store.
785    ///
786    /// Returns all key-value pairs where the key is >= `start_key` and <= `end_key`.
787    /// Both bounds are inclusive.
788    ///
789    /// # Arguments
790    ///
791    /// * `start_key` - Inclusive lower bound
792    /// * `end_key` - Inclusive upper bound
793    /// * `limit` - Maximum number of results to return
794    ///
795    /// # Returns
796    ///
797    /// Returns a vector of (key, value) pairs in sorted order.
798    ///
799    /// # Example
800    ///
801    /// ```rust
802    /// # use feoxdb::FeoxStore;
803    /// # fn main() -> feoxdb::Result<()> {
804    /// # let store = FeoxStore::new(None)?;
805    /// store.insert(b"user:001", b"Alice")?;
806    /// store.insert(b"user:002", b"Bob")?;
807    /// store.insert(b"user:003", b"Charlie")?;
808    /// store.insert(b"user:004", b"David")?;
809    ///
810    /// // Get users 001 through 003 (inclusive)
811    /// let results = store.range_query(b"user:001", b"user:003", 10)?;
812    /// assert_eq!(results.len(), 3);
813    /// # Ok(())
814    /// # }
815    /// ```
816    pub fn range_query(
817        &self,
818        start_key: &[u8],
819        end_key: &[u8],
820        limit: usize,
821    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
822        if start_key.len() > MAX_KEY_SIZE || end_key.len() > MAX_KEY_SIZE {
823            return Err(FeoxError::InvalidKeySize);
824        }
825
826        let mut results = Vec::new();
827
828        for entry in self.tree.range(start_key.to_vec()..=end_key.to_vec()) {
829            if results.len() >= limit {
830                break;
831            }
832
833            let record = entry.value();
834            let value = if let Some(val) = record.get_value() {
835                val.to_vec()
836            } else {
837                self.load_value_from_disk(record)?
838            };
839
840            results.push((entry.key().clone(), value));
841        }
842
843        Ok(results)
844    }
845
846    /// Atomically increment a numeric counter.
847    ///
848    /// The value must be stored as an 8-byte little-endian i64. If the key doesn't exist,
849    /// it will be created with the given delta value. If it exists, the value will be
850    /// incremented atomically.
851    ///
852    /// # Value Format
853    ///
854    /// The value MUST be exactly 8 bytes representing a little-endian i64.
855    /// Use `i64::to_le_bytes()` to create the initial value:
856    /// ```rust,ignore
857    /// let zero: i64 = 0;
858    /// store.insert(b"counter", &zero.to_le_bytes())?;
859    /// ```
860    ///
861    /// # Arguments
862    ///
863    /// * `key` - The key of the counter
864    /// * `delta` - The amount to increment by (can be negative for decrement)
865    /// * `timestamp` - Optional timestamp for conflict resolution
866    ///
867    /// # Returns
868    ///
869    /// Returns the new value after incrementing.
870    ///
871    /// # Errors
872    ///
873    /// * `InvalidOperation` - Existing value is not exactly 8 bytes (not a valid i64)
874    /// * `OlderTimestamp` - Timestamp is not newer than existing record
875    ///
876    /// # Example
877    ///
878    /// ```rust
879    /// # use feoxdb::FeoxStore;
880    /// # fn main() -> feoxdb::Result<()> {
881    /// # let store = FeoxStore::new(None)?;
882    /// // Initialize counter with proper binary format
883    /// let initial: i64 = 0;
884    /// store.insert(b"visits", &initial.to_le_bytes())?;
885    ///
886    /// // Increment atomically
887    /// let val = store.atomic_increment(b"visits", 1)?;
888    /// assert_eq!(val, 1);
889    ///
890    /// // Increment by 5
891    /// let val = store.atomic_increment(b"visits", 5)?;
892    /// assert_eq!(val, 6);
893    ///
894    /// // Decrement by 2
895    /// let val = store.atomic_increment(b"visits", -2)?;
896    /// assert_eq!(val, 4);
897    ///
898    /// // Or create new counter directly (starts at delta value)
899    /// let downloads = store.atomic_increment(b"downloads", 100)?;
900    /// assert_eq!(downloads, 100);
901    /// # Ok(())
902    /// # }
903    /// ```
904    pub fn atomic_increment(&self, key: &[u8], delta: i64) -> Result<i64> {
905        self.atomic_increment_with_timestamp(key, delta, None)
906    }
907
908    /// Atomically increment/decrement with explicit timestamp.
909    ///
910    /// This is the advanced version that allows manual timestamp control.
911    /// Most users should use `atomic_increment()` instead.
912    ///
913    /// # Arguments
914    ///
915    /// * `key` - The key to increment/decrement
916    /// * `delta` - Amount to add (negative to decrement)
917    /// * `timestamp` - Optional timestamp. If `None`, uses current time.
918    ///
919    /// # Errors
920    ///
921    /// * `OlderTimestamp` - Timestamp is not newer than existing record
922    pub fn atomic_increment_with_timestamp(
923        &self,
924        key: &[u8],
925        delta: i64,
926        timestamp: Option<u64>,
927    ) -> Result<i64> {
928        self.validate_key(key)?;
929
930        let key_vec = key.to_vec();
931
932        // Use DashMap's entry API for atomic operations
933        let result = match self.hash_table.entry(key_vec.clone()) {
934            dashmap::mapref::entry::Entry::Occupied(mut entry) => {
935                let old_record = entry.get();
936
937                // Get timestamp inside the critical section to ensure it's always newer
938                let timestamp = match timestamp {
939                    Some(0) | None => self.get_timestamp(),
940                    Some(ts) => ts,
941                };
942
943                // Check if timestamp is valid
944                if timestamp < old_record.timestamp {
945                    return Err(FeoxError::OlderTimestamp);
946                }
947
948                // Load value from memory or disk
949                let value = if let Some(val) = old_record.get_value() {
950                    val.to_vec()
951                } else {
952                    // Try loading from disk if not in memory
953                    self.load_value_from_disk(old_record)?
954                };
955
956                let current_val = if value.len() == 8 {
957                    let bytes = value
958                        .get(..8)
959                        .and_then(|slice| slice.try_into().ok())
960                        .ok_or(FeoxError::InvalidNumericValue)?;
961                    i64::from_le_bytes(bytes)
962                } else {
963                    return Err(FeoxError::InvalidOperation);
964                };
965
966                let new_val = current_val.saturating_add(delta);
967                let new_value = new_val.to_le_bytes().to_vec();
968
969                // Create new record
970                let new_record =
971                    Arc::new(Record::new(old_record.key.clone(), new_value, timestamp));
972
973                let old_value_len = old_record.value_len;
974                let old_size = old_record.calculate_size();
975                let new_size = self.calculate_record_size(old_record.key.len(), 8);
976                let old_record_arc = Arc::clone(old_record);
977
978                // Atomically update the entry
979                entry.insert(Arc::clone(&new_record));
980
981                // Update skip list as well
982                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
983
984                // Update memory usage
985                if new_size > old_size {
986                    self.stats
987                        .memory_usage
988                        .fetch_add(new_size - old_size, Ordering::AcqRel);
989                } else {
990                    self.stats
991                        .memory_usage
992                        .fetch_sub(old_size - new_size, Ordering::AcqRel);
993                }
994
995                // Only do cache and persistence operations if not in memory-only mode
996                if !self.memory_only {
997                    if self.enable_caching {
998                        if let Some(ref cache) = self.cache {
999                            cache.remove(&key_vec);
1000                        }
1001                    }
1002
1003                    if let Some(ref wb) = self.write_buffer {
1004                        if let Err(e) =
1005                            wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
1006                        {
1007                            // Atomic operation succeeded in memory
1008                            let _ = e;
1009                        }
1010
1011                        if let Err(e) =
1012                            wb.add_write(Operation::Delete, old_record_arc, old_value_len)
1013                        {
1014                            // Atomic operation succeeded in memory
1015                            let _ = e;
1016                        }
1017                    }
1018                }
1019
1020                Ok(new_val)
1021            }
1022            dashmap::mapref::entry::Entry::Vacant(entry) => {
1023                // Key doesn't exist, create it with initial value
1024                // Get timestamp inside the critical section
1025                let timestamp = match timestamp {
1026                    Some(0) | None => self.get_timestamp(),
1027                    Some(ts) => ts,
1028                };
1029
1030                let initial_val = delta;
1031                let value = initial_val.to_le_bytes().to_vec();
1032
1033                let new_record = Arc::new(Record::new(key_vec.clone(), value, timestamp));
1034
1035                entry.insert(Arc::clone(&new_record));
1036
1037                // Update skip list
1038                self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
1039
1040                // Update statistics
1041                self.stats.record_count.fetch_add(1, Ordering::AcqRel);
1042                let record_size = self.calculate_record_size(key.len(), 8);
1043                self.stats
1044                    .memory_usage
1045                    .fetch_add(record_size, Ordering::AcqRel);
1046
1047                // Handle persistence if needed
1048                if !self.memory_only {
1049                    if let Some(ref wb) = self.write_buffer {
1050                        if let Err(e) = wb.add_write(Operation::Insert, Arc::clone(&new_record), 0)
1051                        {
1052                            // Operation succeeded in memory
1053                            let _ = e;
1054                        }
1055                    }
1056                }
1057
1058                Ok(initial_val)
1059            }
1060        };
1061
1062        result
1063    }
1064
1065    /// Get the size of a value without loading it.
1066    ///
1067    /// Useful for checking value size before loading large values from disk.
1068    ///
1069    /// # Arguments
1070    ///
1071    /// * `key` - The key to check
1072    ///
1073    /// # Returns
1074    ///
1075    /// Returns the size in bytes of the value.
1076    ///
1077    /// # Errors
1078    ///
1079    /// * `KeyNotFound` - Key does not exist
1080    ///
1081    /// # Example
1082    ///
1083    /// ```rust
1084    /// # use feoxdb::FeoxStore;
1085    /// # fn main() -> feoxdb::Result<()> {
1086    /// # let store = FeoxStore::new(None)?;
1087    /// store.insert(b"large_file", &vec![0u8; 1_000_000])?;
1088    ///
1089    /// // Check size before loading
1090    /// let size = store.get_size(b"large_file")?;
1091    /// assert_eq!(size, 1_000_000);
1092    /// # Ok(())
1093    /// # }
1094    /// ```
1095    pub fn get_size(&self, key: &[u8]) -> Result<usize> {
1096        self.validate_key(key)?;
1097
1098        let record = self.hash_table.get(key).ok_or(FeoxError::KeyNotFound)?;
1099
1100        Ok(record.value_len)
1101    }
1102
1103    /// Force flush all pending writes to disk.
1104    ///
1105    /// In persistent mode, ensures all buffered writes are flushed to disk.
1106    /// In memory-only mode, this is a no-op.
1107    ///
1108    /// # Example
1109    ///
1110    /// ```no_run
1111    /// # use feoxdb::FeoxStore;
1112    /// # fn main() -> feoxdb::Result<()> {
1113    /// let store = FeoxStore::new(Some("/path/to/data.feox".to_string()))?;
1114    /// store.insert(b"important", b"data")?;
1115    /// store.flush_all();  // Ensure data is persisted
1116    /// # Ok(())
1117    /// # }
1118    /// ```
1119    pub fn flush_all(&self) {
1120        if !self.memory_only {
1121            // First flush the write buffer to ensure all data is written
1122            if let Some(ref wb) = self.write_buffer {
1123                let _ = wb.force_flush();
1124            }
1125
1126            if let Some(ref disk_io) = self.disk_io {
1127                // Update metadata with current stats
1128                let mut metadata = self._metadata.write();
1129                metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
1130                metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
1131                metadata.fragmentation = self.free_space.read().get_fragmentation();
1132                metadata.update();
1133
1134                // Write metadata
1135                let _ = disk_io.write().write_metadata(metadata.as_bytes());
1136                let _ = disk_io.write().flush();
1137            }
1138        }
1139    }
1140
1141    fn get_timestamp(&self) -> u64 {
1142        SystemTime::now()
1143            .duration_since(UNIX_EPOCH)
1144            .unwrap()
1145            .as_nanos() as u64
1146    }
1147
1148    fn load_value_from_disk(&self, record: &Record) -> Result<Vec<u8>> {
1149        let sector = record.sector.load(Ordering::Acquire);
1150        if self.memory_only || sector == 0 {
1151            return Err(FeoxError::InvalidRecord);
1152        }
1153
1154        // Calculate how many sectors we need to read
1155        let total_size = SECTOR_HEADER_SIZE + 2 + record.key.len() + 8 + 8 + record.value_len;
1156        let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
1157
1158        // Read the sectors
1159        let disk_io = self
1160            .disk_io
1161            .as_ref()
1162            .ok_or_else(|| {
1163                FeoxError::IoError(io::Error::new(
1164                    io::ErrorKind::NotFound,
1165                    "No disk IO available",
1166                ))
1167            })?
1168            .read();
1169
1170        let data = disk_io.read_sectors_sync(sector, sectors_needed as u64)?;
1171
1172        // Skip header and record metadata to get to the value
1173        let offset = SECTOR_HEADER_SIZE + 2 + record.key.len() + 8 + 8;
1174        if offset + record.value_len > data.len() {
1175            return Err(FeoxError::InvalidRecord);
1176        }
1177
1178        Ok(data[offset..offset + record.value_len].to_vec())
1179    }
1180
1181    fn open_device(&mut self, device_path: &Option<String>) -> Result<()> {
1182        if let Some(path) = device_path {
1183            // Open the device/file
1184            use std::fs::OpenOptions;
1185            #[cfg(target_os = "linux")]
1186            use std::os::unix::fs::OpenOptionsExt;
1187
1188            #[cfg(unix)]
1189            let (file, use_direct_io) = if std::path::Path::new("/.dockerenv").exists() {
1190                let file = OpenOptions::new()
1191                    .read(true)
1192                    .write(true)
1193                    .create(true)
1194                    .truncate(false)
1195                    .open(path)
1196                    .map_err(FeoxError::IoError)?;
1197                (file, false) // Don't use O_DIRECT in Docker
1198            } else {
1199                // Try with O_DIRECT on Linux, fall back without it on other Unix systems
1200                #[cfg(target_os = "linux")]
1201                {
1202                    // Try to open with O_DIRECT first
1203                    match OpenOptions::new()
1204                        .read(true)
1205                        .write(true)
1206                        .create(true)
1207                        .truncate(false)
1208                        .custom_flags(libc::O_DIRECT)
1209                        .open(path)
1210                    {
1211                        Ok(file) => (file, true), // Successfully opened with O_DIRECT
1212                        Err(_) => {
1213                            // Fallback to regular open
1214                            let file = OpenOptions::new()
1215                                .read(true)
1216                                .write(true)
1217                                .create(true)
1218                                .truncate(false)
1219                                .open(path)
1220                                .map_err(FeoxError::IoError)?;
1221                            (file, false)
1222                        }
1223                    }
1224                }
1225                #[cfg(not(target_os = "linux"))]
1226                {
1227                    let file = OpenOptions::new()
1228                        .read(true)
1229                        .write(true)
1230                        .create(true)
1231                        .truncate(false)
1232                        .open(path)
1233                        .map_err(FeoxError::IoError)?;
1234                    (file, false) // O_DIRECT not supported on this platform
1235                }
1236            };
1237
1238            #[cfg(not(unix))]
1239            let file = OpenOptions::new()
1240                .read(true)
1241                .write(true)
1242                .create(true)
1243                .truncate(false)
1244                .open(path)
1245                .map_err(FeoxError::IoError)?;
1246
1247            // Get file size
1248            let metadata = file.metadata().map_err(FeoxError::IoError)?;
1249            self.device_size = metadata.len();
1250
1251            // Track whether this is a newly created file
1252            let was_newly_created = self.device_size == 0;
1253
1254            if was_newly_created {
1255                // New empty file - set default size and initialize free space
1256                file.set_len(DEFAULT_DEVICE_SIZE)
1257                    .map_err(FeoxError::IoError)?;
1258                self.device_size = DEFAULT_DEVICE_SIZE;
1259
1260                // Initialize free space manager with all space free
1261                self.free_space.write().initialize(self.device_size)?;
1262
1263                let mut metadata = self._metadata.write();
1264                metadata.device_size = self.device_size;
1265                metadata.update();
1266            } else {
1267                // Existing file - just set device size, free space will be rebuilt during scan
1268                self.free_space.write().set_device_size(self.device_size);
1269            }
1270
1271            #[cfg(unix)]
1272            {
1273                use std::os::unix::io::AsRawFd;
1274                let file_arc = Arc::new(file);
1275                let fd = file_arc.as_raw_fd();
1276                self.device_fd = Some(fd);
1277                // Store a clone of the file to keep it alive
1278                self.device_file = Some(file_arc.as_ref().try_clone().map_err(FeoxError::IoError)?);
1279                let disk_io = crate::storage::io::DiskIO::new(file_arc, use_direct_io)?;
1280                self.disk_io = Some(Arc::new(RwLock::new(disk_io)));
1281            }
1282
1283            #[cfg(not(unix))]
1284            {
1285                // Store a clone of the file to keep it alive
1286                self.device_file = Some(file.try_clone().map_err(FeoxError::IoError)?);
1287                let disk_io = crate::storage::io::DiskIO::new_from_file(file)?;
1288                self.disk_io = Some(Arc::new(RwLock::new(disk_io)));
1289            }
1290
1291            let disk_io = self.disk_io.as_ref().unwrap().read();
1292
1293            // Read metadata from existing files (not newly created ones)
1294            if !was_newly_created {
1295                if let Ok(metadata_bytes) = disk_io.read_metadata() {
1296                    if let Some(loaded_metadata) =
1297                        crate::storage::metadata::Metadata::from_bytes(&metadata_bytes)
1298                    {
1299                        // Initialize stats from metadata
1300                        self.stats
1301                            .disk_usage
1302                            .store(loaded_metadata.total_size, Ordering::Relaxed);
1303                        *self._metadata.write() = loaded_metadata;
1304                    }
1305                }
1306            }
1307        }
1308        Ok(())
1309    }
1310
1311    fn load_indexes(&mut self) -> Result<()> {
1312        if self.memory_only {
1313            return Ok(());
1314        }
1315
1316        // Try to read metadata from sector 0
1317        if let Some(ref disk_io) = self.disk_io {
1318            let metadata_data = disk_io.read().read_metadata()?;
1319
1320            // Check if metadata is valid (has our signature)
1321            if metadata_data.len() >= FEOX_SIGNATURE_SIZE {
1322                let signature = &metadata_data[..FEOX_SIGNATURE_SIZE];
1323
1324                if signature == FEOX_SIGNATURE {
1325                    // Valid metadata found, scan the disk to rebuild indexes
1326                    self.scan_and_rebuild_indexes()?;
1327                }
1328            }
1329        }
1330
1331        Ok(())
1332    }
1333
1334    fn scan_and_rebuild_indexes(&mut self) -> Result<()> {
1335        if self.memory_only || self.device_size == 0 {
1336            return Ok(());
1337        }
1338
1339        let disk_io = self.disk_io.as_ref().ok_or(FeoxError::NoDevice)?;
1340
1341        let total_sectors = self.device_size / FEOX_BLOCK_SIZE as u64;
1342        let mut sector: u64 = 1;
1343        let mut _records_loaded = 0;
1344        let mut occupied_sectors = Vec::new();
1345
1346        while sector < total_sectors {
1347            let data = match disk_io.read().read_sectors_sync(sector, 1) {
1348                Ok(d) => d,
1349                Err(_) => {
1350                    sector += 1;
1351                    continue;
1352                }
1353            };
1354
1355            if data.len() < SECTOR_HEADER_SIZE {
1356                sector += 1;
1357                continue;
1358            }
1359
1360            // Check for deletion marker first
1361            if data.len() >= 8 && &data[..8] == b"\0DELETED" {
1362                // This sector has been deleted, skip it
1363                sector += 1;
1364                continue;
1365            }
1366
1367            let marker = u16::from_le_bytes([data[0], data[1]]);
1368            let seq_num = u16::from_le_bytes([data[2], data[3]]);
1369
1370            if marker != SECTOR_MARKER || seq_num != 0 {
1371                sector += 1;
1372                continue;
1373            }
1374
1375            if data.len() < SECTOR_HEADER_SIZE + 2 {
1376                sector += 1;
1377                continue;
1378            }
1379
1380            let key_len =
1381                u16::from_le_bytes([data[SECTOR_HEADER_SIZE], data[SECTOR_HEADER_SIZE + 1]])
1382                    as usize;
1383
1384            if key_len == 0 || key_len > MAX_KEY_SIZE {
1385                sector += 1;
1386                continue;
1387            }
1388
1389            if data.len() < SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 {
1390                sector += 1;
1391                continue;
1392            }
1393
1394            let mut offset = SECTOR_HEADER_SIZE + 2;
1395            let key = data[offset..offset + key_len].to_vec();
1396            offset += key_len;
1397
1398            let value_len = u64::from_le_bytes([
1399                data[offset],
1400                data[offset + 1],
1401                data[offset + 2],
1402                data[offset + 3],
1403                data[offset + 4],
1404                data[offset + 5],
1405                data[offset + 6],
1406                data[offset + 7],
1407            ]) as usize;
1408            offset += 8;
1409
1410            let timestamp = u64::from_le_bytes([
1411                data[offset],
1412                data[offset + 1],
1413                data[offset + 2],
1414                data[offset + 3],
1415                data[offset + 4],
1416                data[offset + 5],
1417                data[offset + 6],
1418                data[offset + 7],
1419            ]);
1420
1421            let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len;
1422            let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
1423
1424            let mut record = Record::new(key.clone(), Vec::new(), timestamp);
1425            record.sector.store(sector, Ordering::Release);
1426            record.value_len = value_len;
1427            record.clear_value();
1428
1429            let record_arc = Arc::new(record);
1430            self.hash_table.insert(key.clone(), Arc::clone(&record_arc));
1431            self.tree.insert(key, Arc::clone(&record_arc));
1432
1433            self.stats.record_count.fetch_add(1, Ordering::AcqRel);
1434            let record_size = self.calculate_record_size(key_len, value_len);
1435            self.stats
1436                .memory_usage
1437                .fetch_add(record_size, Ordering::AcqRel);
1438
1439            // Track disk usage
1440            self.stats
1441                .disk_usage
1442                .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::AcqRel);
1443
1444            for i in 0..sectors_needed {
1445                occupied_sectors.push(sector + i as u64);
1446            }
1447
1448            _records_loaded += 1;
1449            sector += sectors_needed as u64;
1450        }
1451
1452        // Now rebuild free space from gaps between occupied sectors
1453        occupied_sectors.sort_unstable();
1454
1455        // Start after metadata sectors (sectors 0-15 are reserved)
1456        let mut last_end = FEOX_DATA_START_BLOCK;
1457
1458        for &occupied_start in &occupied_sectors {
1459            if occupied_start > last_end {
1460                self.free_space
1461                    .write()
1462                    .release_sectors(last_end, occupied_start - last_end)?;
1463            }
1464            last_end = occupied_start + 1;
1465        }
1466
1467        if last_end < total_sectors {
1468            self.free_space
1469                .write()
1470                .release_sectors(last_end, total_sectors - last_end)?;
1471        }
1472
1473        Ok(())
1474    }
1475
1476    // ============ Utility Methods ============
1477
1478    /// Check if a key exists
1479    pub fn contains_key(&self, key: &[u8]) -> bool {
1480        self.hash_table.contains_key(key)
1481    }
1482
1483    /// Get the number of records in the store
1484    pub fn len(&self) -> usize {
1485        self.stats.record_count.load(Ordering::Acquire) as usize
1486    }
1487
1488    /// Check if the store is empty
1489    pub fn is_empty(&self) -> bool {
1490        self.len() == 0
1491    }
1492
1493    /// Get memory usage statistics
1494    pub fn memory_usage(&self) -> usize {
1495        self.stats.memory_usage.load(Ordering::Acquire)
1496    }
1497
1498    /// Get statistics snapshot
1499    pub fn stats(&self) -> crate::stats::StatsSnapshot {
1500        self.stats.snapshot()
1501    }
1502
1503    /// Flush all pending writes to disk (for persistent mode)
1504    pub fn flush(&self) {
1505        self.flush_all()
1506    }
1507}
1508
1509impl Drop for FeoxStore {
1510    fn drop(&mut self) {
1511        // Signal shutdown to write buffer workers
1512        if let Some(ref wb) = self.write_buffer {
1513            wb.initiate_shutdown();
1514        }
1515
1516        // Write metadata directly without using the write buffer
1517        if !self.memory_only {
1518            if let Some(ref disk_io) = self.disk_io {
1519                // Update metadata with current stats
1520                let mut metadata = self._metadata.write();
1521                metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
1522                metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
1523                metadata.fragmentation = self.free_space.read().get_fragmentation();
1524                metadata.update();
1525
1526                // Write metadata
1527                let _ = disk_io.write().write_metadata(metadata.as_bytes());
1528                let _ = disk_io.write().flush();
1529            }
1530        }
1531
1532        // Take ownership of write_buffer to properly shut it down
1533        if let Some(wb_arc) = self.write_buffer.take() {
1534            // Try to get mutable access if we're the only owner
1535            if let Ok(wb) = Arc::try_unwrap(wb_arc) {
1536                // We own it exclusively, can call complete_shutdown
1537                let mut wb_mut = wb;
1538                wb_mut.complete_shutdown();
1539            }
1540            // If we can't get exclusive access, workers are already shutting down via initiate_shutdown
1541        }
1542
1543        // Now it's safe to shutdown disk I/O since workers have exited
1544        if let Some(ref disk_io) = self.disk_io {
1545            disk_io.write().shutdown();
1546        }
1547    }
1548}