feoxdb/core/store.rs
1use bytes::Bytes;
2use crossbeam_skiplist::SkipMap;
3use dashmap::DashMap;
4use parking_lot::RwLock;
5use std::io;
6use std::sync::atomic::Ordering;
7use std::sync::Arc;
8use std::time::{SystemTime, UNIX_EPOCH};
9
10pub use crate::constants::Operation;
11use crate::constants::*;
12use crate::core::record::Record;
13use crate::error::{FeoxError, Result};
14use crate::stats::Statistics;
15use crate::storage::free_space::FreeSpaceManager;
16use crate::storage::metadata::Metadata;
17use crate::storage::write_buffer::WriteBuffer;
18
19/// High-performance embedded key-value store.
20///
21/// `FeoxStore` provides ultra-fast key-value storage with optional persistence.
22/// It uses lock-free data structures for concurrent access and achieves
23/// sub-microsecond latencies for most operations.
24///
25/// # Thread Safety
26///
27/// All methods are thread-safe and can be called concurrently from multiple threads.
28pub struct FeoxStore {
29 // Main hash table with lock-free operations
30 hash_table: DashMap<Vec<u8>, Arc<Record>>,
31
32 // Lock-free skip list for ordered access
33 tree: Arc<SkipMap<Vec<u8>, Arc<Record>>>,
34
35 // Central statistics hub
36 stats: Arc<Statistics>,
37
38 // Write buffering (optional for memory-only mode)
39 write_buffer: Option<Arc<WriteBuffer>>,
40
41 // Free space management
42 free_space: Arc<RwLock<FreeSpaceManager>>,
43
44 // Metadata
45 _metadata: Arc<RwLock<Metadata>>,
46
47 // Configuration
48 memory_only: bool,
49 enable_caching: bool,
50 max_memory: Option<usize>,
51
52 // Cache (if enabled)
53 cache: Option<Arc<super::cache::ClockCache>>,
54 #[cfg(unix)]
55 device_fd: Option<i32>,
56 device_size: u64,
57 device_file: Option<std::fs::File>,
58
59 // Disk I/O
60 disk_io: Option<Arc<RwLock<crate::storage::io::DiskIO>>>,
61}
62
63/// Configuration options for FeoxStore.
64///
65/// Use `StoreBuilder` for a more ergonomic way to configure the store.
66pub struct StoreConfig {
67 pub hash_bits: u32,
68 pub memory_only: bool,
69 pub enable_caching: bool,
70 pub device_path: Option<String>,
71 pub max_memory: Option<usize>,
72}
73
74/// Builder for creating FeoxStore with custom configuration.
75///
76/// Provides a fluent interface for configuring store parameters.
77///
78/// # Example
79///
80/// ```rust
81/// use feoxdb::FeoxStore;
82///
83/// # fn main() -> feoxdb::Result<()> {
84/// let store = FeoxStore::builder()
85/// .max_memory(1_000_000_000)
86/// .hash_bits(20)
87/// .build()?;
88/// # Ok(())
89/// # }
90/// ```
91pub struct StoreBuilder {
92 hash_bits: u32,
93 device_path: Option<String>,
94 max_memory: Option<usize>,
95 enable_caching: Option<bool>,
96}
97
98impl StoreBuilder {
99 pub fn new() -> Self {
100 Self {
101 hash_bits: DEFAULT_HASH_BITS,
102 device_path: None,
103 max_memory: Some(DEFAULT_MAX_MEMORY),
104 enable_caching: None, // Auto-detect based on storage mode
105 }
106 }
107
108 /// Set the device path for persistent storage.
109 ///
110 /// When set, data will be persisted to disk asynchronously.
111 /// If not set, the store operates in memory-only mode.
112 pub fn device_path(mut self, path: impl Into<String>) -> Self {
113 self.device_path = Some(path.into());
114 self
115 }
116
117 /// Set the maximum memory limit (in bytes).
118 ///
119 /// The store will start evicting entries when this limit is approached.
120 /// Default: 1GB
121 pub fn max_memory(mut self, limit: usize) -> Self {
122 self.max_memory = Some(limit);
123 self
124 }
125
126 /// Remove memory limit.
127 ///
128 /// Use with caution as the store can grow unbounded.
129 pub fn no_memory_limit(mut self) -> Self {
130 self.max_memory = None;
131 self
132 }
133
134 /// Set number of hash bits (determines hash table size).
135 ///
136 /// More bits = larger hash table = better performance for large datasets.
137 /// Default: 18 (256K buckets)
138 pub fn hash_bits(mut self, bits: u32) -> Self {
139 self.hash_bits = bits;
140 self
141 }
142
143 /// Enable or disable caching.
144 ///
145 /// When enabled, frequently accessed values are kept in memory
146 /// even after being written to disk. Uses CLOCK eviction algorithm.
147 pub fn enable_caching(mut self, enable: bool) -> Self {
148 self.enable_caching = Some(enable);
149 self
150 }
151
152 /// Build the FeoxStore
153 pub fn build(self) -> Result<FeoxStore> {
154 let memory_only = self.device_path.is_none();
155 let enable_caching = self.enable_caching.unwrap_or(!memory_only);
156
157 let config = StoreConfig {
158 hash_bits: self.hash_bits,
159 memory_only,
160 enable_caching,
161 device_path: self.device_path,
162 max_memory: self.max_memory,
163 };
164
165 FeoxStore::with_config(config)
166 }
167}
168
169impl Default for StoreBuilder {
170 fn default() -> Self {
171 Self::new()
172 }
173}
174
175impl FeoxStore {
176 /// Create a new FeoxStore with default configuration
177 pub fn new(device_path: Option<String>) -> Result<Self> {
178 let memory_only = device_path.is_none();
179 let config = StoreConfig {
180 hash_bits: DEFAULT_HASH_BITS,
181 memory_only,
182 enable_caching: !memory_only, // Disable caching for memory-only mode
183 device_path,
184 max_memory: Some(DEFAULT_MAX_MEMORY),
185 };
186 let hash_table = DashMap::with_capacity(1 << config.hash_bits);
187
188 let free_space = Arc::new(RwLock::new(FreeSpaceManager::new()));
189 let metadata = Arc::new(RwLock::new(Metadata::new()));
190 let stats = Arc::new(Statistics::new());
191
192 let cache = if config.enable_caching {
193 Some(Arc::new(super::cache::ClockCache::new(stats.clone())))
194 } else {
195 None
196 };
197
198 let mut store = Self {
199 hash_table,
200 tree: Arc::new(SkipMap::new()),
201 stats: stats.clone(),
202 write_buffer: None,
203 free_space: free_space.clone(),
204 _metadata: metadata,
205 memory_only: config.memory_only,
206 enable_caching: config.enable_caching,
207 max_memory: config.max_memory,
208 cache,
209 #[cfg(unix)]
210 device_fd: None,
211 device_size: 0,
212 device_file: None,
213 disk_io: None,
214 };
215
216 if !config.memory_only {
217 store.open_device(&config.device_path)?;
218 store.load_indexes()?;
219
220 // Initialize write buffer for persistent mode
221 if let Some(ref disk_io) = store.disk_io {
222 let mut write_buffer = WriteBuffer::new(disk_io.clone(), free_space, stats.clone());
223 let num_workers = (num_cpus::get() / 2).max(1);
224 write_buffer.start_workers(num_workers);
225 store.write_buffer = Some(Arc::new(write_buffer));
226 }
227 }
228
229 Ok(store)
230 }
231
232 /// Create a new FeoxStore with custom configuration
233 pub fn with_config(config: StoreConfig) -> Result<Self> {
234 // Initialize hash table with configured capacity
235 let hash_table = DashMap::with_capacity(1 << config.hash_bits);
236
237 let free_space = Arc::new(RwLock::new(FreeSpaceManager::new()));
238 let metadata = Arc::new(RwLock::new(Metadata::new()));
239 let stats = Arc::new(Statistics::new());
240
241 let cache = if config.enable_caching {
242 Some(Arc::new(super::cache::ClockCache::new(stats.clone())))
243 } else {
244 None
245 };
246
247 let mut store = Self {
248 hash_table,
249 tree: Arc::new(SkipMap::new()),
250 stats: stats.clone(),
251 write_buffer: None,
252 free_space: free_space.clone(),
253 _metadata: metadata,
254 memory_only: config.memory_only,
255 enable_caching: config.enable_caching,
256 max_memory: config.max_memory,
257 cache,
258 #[cfg(unix)]
259 device_fd: None,
260 device_size: 0,
261 device_file: None,
262 disk_io: None,
263 };
264
265 if !config.memory_only {
266 store.open_device(&config.device_path)?;
267 store.load_indexes()?;
268
269 // Initialize write buffer for persistent mode
270 if let Some(ref disk_io) = store.disk_io {
271 let mut write_buffer = WriteBuffer::new(disk_io.clone(), free_space, stats.clone());
272 let num_workers = (num_cpus::get() / 2).max(1);
273 write_buffer.start_workers(num_workers);
274 store.write_buffer = Some(Arc::new(write_buffer));
275 }
276 }
277
278 Ok(store)
279 }
280
281 /// Create a builder for configuring FeoxStore.
282 ///
283 /// # Example
284 ///
285 /// ```rust
286 /// use feoxdb::FeoxStore;
287 ///
288 /// # fn main() -> feoxdb::Result<()> {
289 /// let store = FeoxStore::builder()
290 /// .max_memory(2_000_000_000)
291 /// .build()?;
292 /// # Ok(())
293 /// # }
294 /// ```
295 pub fn builder() -> StoreBuilder {
296 StoreBuilder::new()
297 }
298
299 /// Insert or update a key-value pair.
300 ///
301 /// # Arguments
302 ///
303 /// * `key` - The key to insert (max 65KB)
304 /// * `value` - The value to store (max 4GB)
305 /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
306 ///
307 /// # Returns
308 ///
309 /// Returns `Ok(())` if successful.
310 ///
311 /// # Errors
312 ///
313 /// * `InvalidKey` - Key is empty or too large
314 /// * `InvalidValue` - Value is too large
315 /// * `OlderTimestamp` - Timestamp is not newer than existing record
316 /// * `OutOfMemory` - Memory limit exceeded
317 ///
318 /// # Example
319 ///
320 /// ```rust
321 /// # use feoxdb::FeoxStore;
322 /// # fn main() -> feoxdb::Result<()> {
323 /// # let store = FeoxStore::new(None)?;
324 /// store.insert(b"user:123", b"{\"name\":\"Mehran\"}")?;
325 /// # Ok(())
326 /// # }
327 /// ```
328 ///
329 /// # Performance
330 ///
331 /// * Memory mode: ~800ns
332 /// * Persistent mode: ~1µs (buffered write)
333 pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
334 self.insert_with_timestamp(key, value, None)
335 }
336
337 /// Insert or update a key-value pair with explicit timestamp.
338 ///
339 /// This is the advanced version that allows manual timestamp control for
340 /// conflict resolution. Most users should use `insert()` instead.
341 ///
342 /// # Arguments
343 ///
344 /// * `key` - The key to insert (max 65KB)
345 /// * `value` - The value to store (max 4GB)
346 /// * `timestamp` - Optional timestamp for conflict resolution. If `None`, uses current time.
347 ///
348 /// # Errors
349 ///
350 /// * `OlderTimestamp` - Timestamp is not newer than existing record
351 pub fn insert_with_timestamp(
352 &self,
353 key: &[u8],
354 value: &[u8],
355 timestamp: Option<u64>,
356 ) -> Result<()> {
357 let start = std::time::Instant::now();
358 let timestamp = match timestamp {
359 Some(0) | None => self.get_timestamp(),
360 Some(ts) => ts,
361 };
362 self.validate_key_value(key, value)?;
363
364 // Check for existing record
365 let is_update = self.hash_table.contains_key(key);
366 if let Some(existing_record) = self.hash_table.get(key) {
367 let existing_ts = existing_record.timestamp;
368 let existing_clone = Arc::clone(&existing_record);
369 drop(existing_record); // Release the reference before updating
370
371 if timestamp < existing_ts {
372 return Err(FeoxError::OlderTimestamp);
373 }
374
375 // Update existing record
376 return self.update_record(&existing_clone, value, timestamp);
377 }
378
379 let record_size = self.calculate_record_size(key.len(), value.len());
380 if !self.check_memory_limit(record_size) {
381 return Err(FeoxError::OutOfMemory);
382 }
383
384 // Create new record
385 let record = Arc::new(Record::new(key.to_vec(), value.to_vec(), timestamp));
386
387 let key_vec = record.key.clone();
388
389 // Insert into hash table - DashMap handles locking internally
390 self.hash_table.insert(key_vec.clone(), Arc::clone(&record));
391
392 // Insert into lock-free skip list for ordered access
393 self.tree.insert(key_vec, Arc::clone(&record));
394
395 // Update statistics
396 self.stats.record_count.fetch_add(1, Ordering::AcqRel);
397 self.stats
398 .memory_usage
399 .fetch_add(record_size, Ordering::AcqRel);
400 self.stats
401 .record_insert(start.elapsed().as_nanos() as u64, is_update);
402
403 // Only do persistence and cache checks if not in memory-only mode
404 if !self.memory_only {
405 // Queue for persistence if write buffer exists
406 if let Some(ref wb) = self.write_buffer {
407 if let Err(_e) = wb.add_write(Operation::Insert, record, 0) {
408 // Don't fail the insert - data is still in memory
409 // Return code already indicates success since data is in memory
410 }
411 }
412
413 // Check memory pressure and trigger cache eviction if needed
414 if self.enable_caching {
415 if let Some(ref cache) = self.cache {
416 let stats = cache.stats();
417 if stats.memory_usage > stats.high_watermark {
418 cache.evict_entries();
419 }
420 }
421 }
422 }
423
424 Ok(())
425 }
426
427 /// Retrieve a value by key.
428 ///
429 /// # Arguments
430 ///
431 /// * `key` - The key to look up
432 /// * `expected_size` - Optional expected value size for validation
433 ///
434 /// # Returns
435 ///
436 /// Returns the value as a `Vec<u8>` if found.
437 ///
438 /// # Errors
439 ///
440 /// * `KeyNotFound` - Key does not exist
441 /// * `InvalidKey` - Key is invalid
442 /// * `SizeMismatch` - Value size doesn't match expected size
443 /// * `IoError` - Failed to read from disk (persistent mode)
444 ///
445 /// # Example
446 ///
447 /// ```rust
448 /// # use feoxdb::FeoxStore;
449 /// # fn main() -> feoxdb::Result<()> {
450 /// # let store = FeoxStore::new(None)?;
451 /// # store.insert(b"key", b"value")?;
452 /// let value = store.get(b"key")?;
453 /// assert_eq!(value, b"value");
454 /// # Ok(())
455 /// # }
456 /// ```
457 ///
458 /// # Performance
459 ///
460 /// * Memory mode: ~100ns
461 /// * Persistent mode (cached): ~150ns
462 /// * Persistent mode (disk read): ~500ns
463 pub fn get(&self, key: &[u8]) -> Result<Vec<u8>> {
464 let start = std::time::Instant::now();
465 self.validate_key(key)?;
466
467 let mut cache_hit = false;
468 if self.enable_caching {
469 if let Some(ref cache) = self.cache {
470 if let Some(value) = cache.get(key) {
471 self.stats
472 .record_get(start.elapsed().as_nanos() as u64, true);
473 return Ok(value.to_vec());
474 }
475 }
476 }
477
478 let record = self.hash_table.get(key).ok_or(FeoxError::KeyNotFound)?;
479
480 let value = if let Some(val) = record.get_value() {
481 val.to_vec()
482 } else {
483 cache_hit = false; // Reading from disk
484 self.load_value_from_disk(&record)?
485 };
486
487 if self.enable_caching {
488 if let Some(ref cache) = self.cache {
489 cache.insert(key.to_vec(), Bytes::from(value.clone()));
490 }
491 }
492
493 self.stats
494 .record_get(start.elapsed().as_nanos() as u64, cache_hit);
495 Ok(value)
496 }
497
498 /// Delete a key-value pair.
499 ///
500 /// # Arguments
501 ///
502 /// * `key` - The key to delete
503 /// * `timestamp` - Optional timestamp for conflict resolution
504 ///
505 /// # Returns
506 ///
507 /// Returns `Ok(())` if the key was deleted.
508 ///
509 /// # Errors
510 ///
511 /// * `KeyNotFound` - Key does not exist
512 /// * `OlderTimestamp` - Timestamp is not newer than existing record
513 ///
514 /// # Example
515 ///
516 /// ```rust
517 /// # use feoxdb::FeoxStore;
518 /// # fn main() -> feoxdb::Result<()> {
519 /// # let store = FeoxStore::new(None)?;
520 /// # store.insert(b"temp", b"data")?;
521 /// store.delete(b"temp")?;
522 /// # Ok(())
523 /// # }
524 /// ```
525 ///
526 /// # Performance
527 ///
528 /// * Memory mode: ~300ns
529 /// * Persistent mode: ~400ns
530 pub fn delete(&self, key: &[u8]) -> Result<()> {
531 self.delete_with_timestamp(key, None)
532 }
533
534 /// Delete a key-value pair with explicit timestamp.
535 ///
536 /// This is the advanced version that allows manual timestamp control.
537 /// Most users should use `delete()` instead.
538 ///
539 /// # Arguments
540 ///
541 /// * `key` - The key to delete
542 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
543 ///
544 /// # Errors
545 ///
546 /// * `OlderTimestamp` - Timestamp is not newer than existing record
547 pub fn delete_with_timestamp(&self, key: &[u8], timestamp: Option<u64>) -> Result<()> {
548 let start = std::time::Instant::now();
549 let timestamp = match timestamp {
550 Some(0) | None => self.get_timestamp(),
551 Some(ts) => ts,
552 };
553 self.validate_key(key)?;
554
555 // Remove from hash table and get the record
556 let (_key, record) = self.hash_table.remove(key).ok_or(FeoxError::KeyNotFound)?;
557
558 if timestamp < record.timestamp {
559 // Put it back if timestamp is older
560 self.hash_table.insert(key.to_vec(), record);
561 return Err(FeoxError::OlderTimestamp);
562 }
563
564 let record_size = record.calculate_size();
565 let old_value_len = record.value_len;
566
567 // Mark record as deleted by setting refcount to 0
568 record.refcount.store(0, Ordering::Release);
569
570 // Remove from lock-free skip list
571 self.tree.remove(key);
572
573 // Update statistics
574 self.stats.record_count.fetch_sub(1, Ordering::AcqRel);
575 self.stats
576 .memory_usage
577 .fetch_sub(record_size, Ordering::AcqRel);
578
579 // Clear from cache
580 if self.enable_caching {
581 if let Some(ref cache) = self.cache {
582 cache.remove(key);
583 }
584 }
585
586 // Queue deletion for persistence if write buffer exists and not memory-only
587 if !self.memory_only {
588 if let Some(ref wb) = self.write_buffer {
589 if let Err(_e) = wb.add_write(Operation::Delete, record, old_value_len) {
590 // Silent failure - data operation succeeded in memory
591 }
592 }
593 }
594
595 self.stats.record_delete(start.elapsed().as_nanos() as u64);
596 Ok(())
597 }
598
599 /// Apply a JSON patch to a value.
600 ///
601 /// Uses RFC 6902 JSON Patch format to modify specific fields in a JSON document.
602 /// Both the existing value and the patch must be valid JSON.
603 ///
604 /// # Arguments
605 ///
606 /// * `key` - The key containing the JSON document to patch
607 /// * `patch` - JSON Patch operations in RFC 6902 format
608 /// * `timestamp` - Optional timestamp for conflict resolution
609 ///
610 /// # Returns
611 ///
612 /// Returns `Ok(())` if the update was applied.
613 ///
614 /// # Errors
615 ///
616 /// * `KeyNotFound` - Key does not exist
617 /// * `OlderTimestamp` - Timestamp is not newer than existing record
618 /// * `JsonPatchError` - Invalid JSON document or patch format
619 ///
620 /// # Example
621 ///
622 /// ```no_run
623 /// # use feoxdb::FeoxStore;
624 /// # fn main() -> feoxdb::Result<()> {
625 /// # let store = FeoxStore::new(None)?;
626 /// // Insert initial JSON value
627 /// let initial = br#"{"name":"Alice","age":30}"#;
628 /// store.insert(b"user:1", initial)?;
629 ///
630 /// // Apply JSON patch to update age
631 /// let patch = br#"[{"op":"replace","path":"/age","value":31}]"#;
632 /// store.json_patch(b"user:1", patch)?;
633 ///
634 /// // Value now has age updated to 31
635 /// let updated = store.get(b"user:1")?;
636 /// assert_eq!(updated.len(), initial.len()); // Same length, just age changed
637 /// # Ok(())
638 /// # }
639 /// ```
640 pub fn json_patch(&self, key: &[u8], patch: &[u8]) -> Result<()> {
641 self.json_patch_with_timestamp(key, patch, None)
642 }
643
644 /// Apply JSON patch with explicit timestamp.
645 ///
646 /// This is the advanced version that allows manual timestamp control.
647 /// Most users should use `json_patch()` instead.
648 ///
649 /// # Arguments
650 ///
651 /// * `key` - The key whose value to patch
652 /// * `patch` - JSON Patch array (RFC 6902)
653 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
654 ///
655 /// # Errors
656 ///
657 /// * `OlderTimestamp` - Timestamp is not newer than existing record
658 pub fn json_patch_with_timestamp(
659 &self,
660 key: &[u8],
661 patch: &[u8],
662 timestamp: Option<u64>,
663 ) -> Result<()> {
664 let timestamp = match timestamp {
665 Some(0) | None => self.get_timestamp(),
666 Some(ts) => ts,
667 };
668 self.validate_key(key)?;
669
670 // Get the value and release the lock immediately
671 let current_value = {
672 let record = self.hash_table.get(key).ok_or(FeoxError::KeyNotFound)?;
673
674 if timestamp < record.timestamp {
675 return Err(FeoxError::OlderTimestamp);
676 }
677
678 if let Some(val) = record.get_value() {
679 val.to_vec()
680 } else {
681 self.load_value_from_disk(&record)?
682 }
683 };
684
685 let new_value = crate::utils::json_patch::apply_json_patch(¤t_value, patch)?;
686
687 // Now update without holding any references
688 self.insert_with_timestamp(key, &new_value, Some(timestamp))
689 }
690
691 fn update_record(&self, old_record: &Record, value: &[u8], timestamp: u64) -> Result<()> {
692 let new_record = Arc::new(Record::new(
693 old_record.key.clone(),
694 value.to_vec(),
695 timestamp,
696 ));
697
698 let old_value_len = old_record.value_len;
699 let old_size = old_record.calculate_size();
700 let new_size = self.calculate_record_size(old_record.key.len(), value.len());
701
702 let old_record_arc = if let Some(entry) = self.hash_table.get(&old_record.key) {
703 Arc::clone(&*entry)
704 } else {
705 return Err(FeoxError::KeyNotFound);
706 };
707
708 let key_vec = new_record.key.clone();
709
710 self.hash_table
711 .insert(key_vec.clone(), Arc::clone(&new_record));
712 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
713
714 if new_size > old_size {
715 self.stats
716 .memory_usage
717 .fetch_add(new_size - old_size, Ordering::AcqRel);
718 } else {
719 self.stats
720 .memory_usage
721 .fetch_sub(old_size - new_size, Ordering::AcqRel);
722 }
723
724 // Only do cache and persistence operations if not in memory-only mode
725 if !self.memory_only {
726 if self.enable_caching {
727 if let Some(ref cache) = self.cache {
728 cache.remove(&key_vec);
729 }
730 }
731
732 if let Some(ref wb) = self.write_buffer {
733 if let Err(e) =
734 wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
735 {
736 // Data operation succeeded in memory
737 let _ = e;
738 }
739
740 if let Err(e) = wb.add_write(Operation::Delete, old_record_arc, old_value_len) {
741 // Data operation succeeded in memory
742 let _ = e;
743 }
744 }
745 }
746
747 Ok(())
748 }
749
750 fn validate_key_value(&self, key: &[u8], value: &[u8]) -> Result<()> {
751 if key.is_empty() || key.len() > MAX_KEY_SIZE {
752 return Err(FeoxError::InvalidKeySize);
753 }
754
755 if value.is_empty() || value.len() > MAX_VALUE_SIZE {
756 return Err(FeoxError::InvalidValueSize);
757 }
758
759 Ok(())
760 }
761
762 fn validate_key(&self, key: &[u8]) -> Result<()> {
763 if key.is_empty() || key.len() > MAX_KEY_SIZE {
764 return Err(FeoxError::InvalidKeySize);
765 }
766
767 Ok(())
768 }
769
770 fn check_memory_limit(&self, size: usize) -> bool {
771 match self.max_memory {
772 Some(limit) => {
773 let current = self.stats.memory_usage.load(Ordering::Acquire);
774 current + size <= limit
775 }
776 None => true,
777 }
778 }
779
780 fn calculate_record_size(&self, key_len: usize, value_len: usize) -> usize {
781 std::mem::size_of::<Record>() + key_len + value_len
782 }
783
784 /// Perform a range query on the store.
785 ///
786 /// Returns all key-value pairs where the key is >= `start_key` and <= `end_key`.
787 /// Both bounds are inclusive.
788 ///
789 /// # Arguments
790 ///
791 /// * `start_key` - Inclusive lower bound
792 /// * `end_key` - Inclusive upper bound
793 /// * `limit` - Maximum number of results to return
794 ///
795 /// # Returns
796 ///
797 /// Returns a vector of (key, value) pairs in sorted order.
798 ///
799 /// # Example
800 ///
801 /// ```rust
802 /// # use feoxdb::FeoxStore;
803 /// # fn main() -> feoxdb::Result<()> {
804 /// # let store = FeoxStore::new(None)?;
805 /// store.insert(b"user:001", b"Alice")?;
806 /// store.insert(b"user:002", b"Bob")?;
807 /// store.insert(b"user:003", b"Charlie")?;
808 /// store.insert(b"user:004", b"David")?;
809 ///
810 /// // Get users 001 through 003 (inclusive)
811 /// let results = store.range_query(b"user:001", b"user:003", 10)?;
812 /// assert_eq!(results.len(), 3);
813 /// # Ok(())
814 /// # }
815 /// ```
816 pub fn range_query(
817 &self,
818 start_key: &[u8],
819 end_key: &[u8],
820 limit: usize,
821 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
822 if start_key.len() > MAX_KEY_SIZE || end_key.len() > MAX_KEY_SIZE {
823 return Err(FeoxError::InvalidKeySize);
824 }
825
826 let mut results = Vec::new();
827
828 for entry in self.tree.range(start_key.to_vec()..=end_key.to_vec()) {
829 if results.len() >= limit {
830 break;
831 }
832
833 let record = entry.value();
834 let value = if let Some(val) = record.get_value() {
835 val.to_vec()
836 } else {
837 self.load_value_from_disk(record)?
838 };
839
840 results.push((entry.key().clone(), value));
841 }
842
843 Ok(results)
844 }
845
846 /// Atomically increment a numeric counter.
847 ///
848 /// The value must be stored as an 8-byte little-endian i64. If the key doesn't exist,
849 /// it will be created with the given delta value. If it exists, the value will be
850 /// incremented atomically.
851 ///
852 /// # Value Format
853 ///
854 /// The value MUST be exactly 8 bytes representing a little-endian i64.
855 /// Use `i64::to_le_bytes()` to create the initial value:
856 /// ```rust,ignore
857 /// let zero: i64 = 0;
858 /// store.insert(b"counter", &zero.to_le_bytes())?;
859 /// ```
860 ///
861 /// # Arguments
862 ///
863 /// * `key` - The key of the counter
864 /// * `delta` - The amount to increment by (can be negative for decrement)
865 /// * `timestamp` - Optional timestamp for conflict resolution
866 ///
867 /// # Returns
868 ///
869 /// Returns the new value after incrementing.
870 ///
871 /// # Errors
872 ///
873 /// * `InvalidOperation` - Existing value is not exactly 8 bytes (not a valid i64)
874 /// * `OlderTimestamp` - Timestamp is not newer than existing record
875 ///
876 /// # Example
877 ///
878 /// ```rust
879 /// # use feoxdb::FeoxStore;
880 /// # fn main() -> feoxdb::Result<()> {
881 /// # let store = FeoxStore::new(None)?;
882 /// // Initialize counter with proper binary format
883 /// let initial: i64 = 0;
884 /// store.insert(b"visits", &initial.to_le_bytes())?;
885 ///
886 /// // Increment atomically
887 /// let val = store.atomic_increment(b"visits", 1)?;
888 /// assert_eq!(val, 1);
889 ///
890 /// // Increment by 5
891 /// let val = store.atomic_increment(b"visits", 5)?;
892 /// assert_eq!(val, 6);
893 ///
894 /// // Decrement by 2
895 /// let val = store.atomic_increment(b"visits", -2)?;
896 /// assert_eq!(val, 4);
897 ///
898 /// // Or create new counter directly (starts at delta value)
899 /// let downloads = store.atomic_increment(b"downloads", 100)?;
900 /// assert_eq!(downloads, 100);
901 /// # Ok(())
902 /// # }
903 /// ```
904 pub fn atomic_increment(&self, key: &[u8], delta: i64) -> Result<i64> {
905 self.atomic_increment_with_timestamp(key, delta, None)
906 }
907
908 /// Atomically increment/decrement with explicit timestamp.
909 ///
910 /// This is the advanced version that allows manual timestamp control.
911 /// Most users should use `atomic_increment()` instead.
912 ///
913 /// # Arguments
914 ///
915 /// * `key` - The key to increment/decrement
916 /// * `delta` - Amount to add (negative to decrement)
917 /// * `timestamp` - Optional timestamp. If `None`, uses current time.
918 ///
919 /// # Errors
920 ///
921 /// * `OlderTimestamp` - Timestamp is not newer than existing record
922 pub fn atomic_increment_with_timestamp(
923 &self,
924 key: &[u8],
925 delta: i64,
926 timestamp: Option<u64>,
927 ) -> Result<i64> {
928 self.validate_key(key)?;
929
930 let key_vec = key.to_vec();
931
932 // Use DashMap's entry API for atomic operations
933 let result = match self.hash_table.entry(key_vec.clone()) {
934 dashmap::mapref::entry::Entry::Occupied(mut entry) => {
935 let old_record = entry.get();
936
937 // Get timestamp inside the critical section to ensure it's always newer
938 let timestamp = match timestamp {
939 Some(0) | None => self.get_timestamp(),
940 Some(ts) => ts,
941 };
942
943 // Check if timestamp is valid
944 if timestamp < old_record.timestamp {
945 return Err(FeoxError::OlderTimestamp);
946 }
947
948 // Load value from memory or disk
949 let value = if let Some(val) = old_record.get_value() {
950 val.to_vec()
951 } else {
952 // Try loading from disk if not in memory
953 self.load_value_from_disk(old_record)?
954 };
955
956 let current_val = if value.len() == 8 {
957 let bytes = value
958 .get(..8)
959 .and_then(|slice| slice.try_into().ok())
960 .ok_or(FeoxError::InvalidNumericValue)?;
961 i64::from_le_bytes(bytes)
962 } else {
963 return Err(FeoxError::InvalidOperation);
964 };
965
966 let new_val = current_val.saturating_add(delta);
967 let new_value = new_val.to_le_bytes().to_vec();
968
969 // Create new record
970 let new_record =
971 Arc::new(Record::new(old_record.key.clone(), new_value, timestamp));
972
973 let old_value_len = old_record.value_len;
974 let old_size = old_record.calculate_size();
975 let new_size = self.calculate_record_size(old_record.key.len(), 8);
976 let old_record_arc = Arc::clone(old_record);
977
978 // Atomically update the entry
979 entry.insert(Arc::clone(&new_record));
980
981 // Update skip list as well
982 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
983
984 // Update memory usage
985 if new_size > old_size {
986 self.stats
987 .memory_usage
988 .fetch_add(new_size - old_size, Ordering::AcqRel);
989 } else {
990 self.stats
991 .memory_usage
992 .fetch_sub(old_size - new_size, Ordering::AcqRel);
993 }
994
995 // Only do cache and persistence operations if not in memory-only mode
996 if !self.memory_only {
997 if self.enable_caching {
998 if let Some(ref cache) = self.cache {
999 cache.remove(&key_vec);
1000 }
1001 }
1002
1003 if let Some(ref wb) = self.write_buffer {
1004 if let Err(e) =
1005 wb.add_write(Operation::Update, Arc::clone(&new_record), old_value_len)
1006 {
1007 // Atomic operation succeeded in memory
1008 let _ = e;
1009 }
1010
1011 if let Err(e) =
1012 wb.add_write(Operation::Delete, old_record_arc, old_value_len)
1013 {
1014 // Atomic operation succeeded in memory
1015 let _ = e;
1016 }
1017 }
1018 }
1019
1020 Ok(new_val)
1021 }
1022 dashmap::mapref::entry::Entry::Vacant(entry) => {
1023 // Key doesn't exist, create it with initial value
1024 // Get timestamp inside the critical section
1025 let timestamp = match timestamp {
1026 Some(0) | None => self.get_timestamp(),
1027 Some(ts) => ts,
1028 };
1029
1030 let initial_val = delta;
1031 let value = initial_val.to_le_bytes().to_vec();
1032
1033 let new_record = Arc::new(Record::new(key_vec.clone(), value, timestamp));
1034
1035 entry.insert(Arc::clone(&new_record));
1036
1037 // Update skip list
1038 self.tree.insert(key_vec.clone(), Arc::clone(&new_record));
1039
1040 // Update statistics
1041 self.stats.record_count.fetch_add(1, Ordering::AcqRel);
1042 let record_size = self.calculate_record_size(key.len(), 8);
1043 self.stats
1044 .memory_usage
1045 .fetch_add(record_size, Ordering::AcqRel);
1046
1047 // Handle persistence if needed
1048 if !self.memory_only {
1049 if let Some(ref wb) = self.write_buffer {
1050 if let Err(e) = wb.add_write(Operation::Insert, Arc::clone(&new_record), 0)
1051 {
1052 // Operation succeeded in memory
1053 let _ = e;
1054 }
1055 }
1056 }
1057
1058 Ok(initial_val)
1059 }
1060 };
1061
1062 result
1063 }
1064
1065 /// Get the size of a value without loading it.
1066 ///
1067 /// Useful for checking value size before loading large values from disk.
1068 ///
1069 /// # Arguments
1070 ///
1071 /// * `key` - The key to check
1072 ///
1073 /// # Returns
1074 ///
1075 /// Returns the size in bytes of the value.
1076 ///
1077 /// # Errors
1078 ///
1079 /// * `KeyNotFound` - Key does not exist
1080 ///
1081 /// # Example
1082 ///
1083 /// ```rust
1084 /// # use feoxdb::FeoxStore;
1085 /// # fn main() -> feoxdb::Result<()> {
1086 /// # let store = FeoxStore::new(None)?;
1087 /// store.insert(b"large_file", &vec![0u8; 1_000_000])?;
1088 ///
1089 /// // Check size before loading
1090 /// let size = store.get_size(b"large_file")?;
1091 /// assert_eq!(size, 1_000_000);
1092 /// # Ok(())
1093 /// # }
1094 /// ```
1095 pub fn get_size(&self, key: &[u8]) -> Result<usize> {
1096 self.validate_key(key)?;
1097
1098 let record = self.hash_table.get(key).ok_or(FeoxError::KeyNotFound)?;
1099
1100 Ok(record.value_len)
1101 }
1102
1103 /// Force flush all pending writes to disk.
1104 ///
1105 /// In persistent mode, ensures all buffered writes are flushed to disk.
1106 /// In memory-only mode, this is a no-op.
1107 ///
1108 /// # Example
1109 ///
1110 /// ```no_run
1111 /// # use feoxdb::FeoxStore;
1112 /// # fn main() -> feoxdb::Result<()> {
1113 /// let store = FeoxStore::new(Some("/path/to/data.feox".to_string()))?;
1114 /// store.insert(b"important", b"data")?;
1115 /// store.flush_all(); // Ensure data is persisted
1116 /// # Ok(())
1117 /// # }
1118 /// ```
1119 pub fn flush_all(&self) {
1120 if !self.memory_only {
1121 // First flush the write buffer to ensure all data is written
1122 if let Some(ref wb) = self.write_buffer {
1123 let _ = wb.force_flush();
1124 }
1125
1126 if let Some(ref disk_io) = self.disk_io {
1127 // Update metadata with current stats
1128 let mut metadata = self._metadata.write();
1129 metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
1130 metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
1131 metadata.fragmentation = self.free_space.read().get_fragmentation();
1132 metadata.update();
1133
1134 // Write metadata
1135 let _ = disk_io.write().write_metadata(metadata.as_bytes());
1136 let _ = disk_io.write().flush();
1137 }
1138 }
1139 }
1140
1141 fn get_timestamp(&self) -> u64 {
1142 SystemTime::now()
1143 .duration_since(UNIX_EPOCH)
1144 .unwrap()
1145 .as_nanos() as u64
1146 }
1147
1148 fn load_value_from_disk(&self, record: &Record) -> Result<Vec<u8>> {
1149 let sector = record.sector.load(Ordering::Acquire);
1150 if self.memory_only || sector == 0 {
1151 return Err(FeoxError::InvalidRecord);
1152 }
1153
1154 // Calculate how many sectors we need to read
1155 let total_size = SECTOR_HEADER_SIZE + 2 + record.key.len() + 8 + 8 + record.value_len;
1156 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
1157
1158 // Read the sectors
1159 let disk_io = self
1160 .disk_io
1161 .as_ref()
1162 .ok_or_else(|| {
1163 FeoxError::IoError(io::Error::new(
1164 io::ErrorKind::NotFound,
1165 "No disk IO available",
1166 ))
1167 })?
1168 .read();
1169
1170 let data = disk_io.read_sectors_sync(sector, sectors_needed as u64)?;
1171
1172 // Skip header and record metadata to get to the value
1173 let offset = SECTOR_HEADER_SIZE + 2 + record.key.len() + 8 + 8;
1174 if offset + record.value_len > data.len() {
1175 return Err(FeoxError::InvalidRecord);
1176 }
1177
1178 Ok(data[offset..offset + record.value_len].to_vec())
1179 }
1180
1181 fn open_device(&mut self, device_path: &Option<String>) -> Result<()> {
1182 if let Some(path) = device_path {
1183 // Open the device/file
1184 use std::fs::OpenOptions;
1185 #[cfg(target_os = "linux")]
1186 use std::os::unix::fs::OpenOptionsExt;
1187
1188 #[cfg(unix)]
1189 let (file, use_direct_io) = if std::path::Path::new("/.dockerenv").exists() {
1190 let file = OpenOptions::new()
1191 .read(true)
1192 .write(true)
1193 .create(true)
1194 .truncate(false)
1195 .open(path)
1196 .map_err(FeoxError::IoError)?;
1197 (file, false) // Don't use O_DIRECT in Docker
1198 } else {
1199 // Try with O_DIRECT on Linux, fall back without it on other Unix systems
1200 #[cfg(target_os = "linux")]
1201 {
1202 // Try to open with O_DIRECT first
1203 match OpenOptions::new()
1204 .read(true)
1205 .write(true)
1206 .create(true)
1207 .truncate(false)
1208 .custom_flags(libc::O_DIRECT)
1209 .open(path)
1210 {
1211 Ok(file) => (file, true), // Successfully opened with O_DIRECT
1212 Err(_) => {
1213 // Fallback to regular open
1214 let file = OpenOptions::new()
1215 .read(true)
1216 .write(true)
1217 .create(true)
1218 .truncate(false)
1219 .open(path)
1220 .map_err(FeoxError::IoError)?;
1221 (file, false)
1222 }
1223 }
1224 }
1225 #[cfg(not(target_os = "linux"))]
1226 {
1227 let file = OpenOptions::new()
1228 .read(true)
1229 .write(true)
1230 .create(true)
1231 .truncate(false)
1232 .open(path)
1233 .map_err(FeoxError::IoError)?;
1234 (file, false) // O_DIRECT not supported on this platform
1235 }
1236 };
1237
1238 #[cfg(not(unix))]
1239 let file = OpenOptions::new()
1240 .read(true)
1241 .write(true)
1242 .create(true)
1243 .truncate(false)
1244 .open(path)
1245 .map_err(FeoxError::IoError)?;
1246
1247 // Get file size
1248 let metadata = file.metadata().map_err(FeoxError::IoError)?;
1249 self.device_size = metadata.len();
1250
1251 // Track whether this is a newly created file
1252 let was_newly_created = self.device_size == 0;
1253
1254 if was_newly_created {
1255 // New empty file - set default size and initialize free space
1256 file.set_len(DEFAULT_DEVICE_SIZE)
1257 .map_err(FeoxError::IoError)?;
1258 self.device_size = DEFAULT_DEVICE_SIZE;
1259
1260 // Initialize free space manager with all space free
1261 self.free_space.write().initialize(self.device_size)?;
1262
1263 let mut metadata = self._metadata.write();
1264 metadata.device_size = self.device_size;
1265 metadata.update();
1266 } else {
1267 // Existing file - just set device size, free space will be rebuilt during scan
1268 self.free_space.write().set_device_size(self.device_size);
1269 }
1270
1271 #[cfg(unix)]
1272 {
1273 use std::os::unix::io::AsRawFd;
1274 let file_arc = Arc::new(file);
1275 let fd = file_arc.as_raw_fd();
1276 self.device_fd = Some(fd);
1277 // Store a clone of the file to keep it alive
1278 self.device_file = Some(file_arc.as_ref().try_clone().map_err(FeoxError::IoError)?);
1279 let disk_io = crate::storage::io::DiskIO::new(file_arc, use_direct_io)?;
1280 self.disk_io = Some(Arc::new(RwLock::new(disk_io)));
1281 }
1282
1283 #[cfg(not(unix))]
1284 {
1285 // Store a clone of the file to keep it alive
1286 self.device_file = Some(file.try_clone().map_err(FeoxError::IoError)?);
1287 let disk_io = crate::storage::io::DiskIO::new_from_file(file)?;
1288 self.disk_io = Some(Arc::new(RwLock::new(disk_io)));
1289 }
1290
1291 let disk_io = self.disk_io.as_ref().unwrap().read();
1292
1293 // Read metadata from existing files (not newly created ones)
1294 if !was_newly_created {
1295 if let Ok(metadata_bytes) = disk_io.read_metadata() {
1296 if let Some(loaded_metadata) =
1297 crate::storage::metadata::Metadata::from_bytes(&metadata_bytes)
1298 {
1299 // Initialize stats from metadata
1300 self.stats
1301 .disk_usage
1302 .store(loaded_metadata.total_size, Ordering::Relaxed);
1303 *self._metadata.write() = loaded_metadata;
1304 }
1305 }
1306 }
1307 }
1308 Ok(())
1309 }
1310
1311 fn load_indexes(&mut self) -> Result<()> {
1312 if self.memory_only {
1313 return Ok(());
1314 }
1315
1316 // Try to read metadata from sector 0
1317 if let Some(ref disk_io) = self.disk_io {
1318 let metadata_data = disk_io.read().read_metadata()?;
1319
1320 // Check if metadata is valid (has our signature)
1321 if metadata_data.len() >= FEOX_SIGNATURE_SIZE {
1322 let signature = &metadata_data[..FEOX_SIGNATURE_SIZE];
1323
1324 if signature == FEOX_SIGNATURE {
1325 // Valid metadata found, scan the disk to rebuild indexes
1326 self.scan_and_rebuild_indexes()?;
1327 }
1328 }
1329 }
1330
1331 Ok(())
1332 }
1333
1334 fn scan_and_rebuild_indexes(&mut self) -> Result<()> {
1335 if self.memory_only || self.device_size == 0 {
1336 return Ok(());
1337 }
1338
1339 let disk_io = self.disk_io.as_ref().ok_or(FeoxError::NoDevice)?;
1340
1341 let total_sectors = self.device_size / FEOX_BLOCK_SIZE as u64;
1342 let mut sector: u64 = 1;
1343 let mut _records_loaded = 0;
1344 let mut occupied_sectors = Vec::new();
1345
1346 while sector < total_sectors {
1347 let data = match disk_io.read().read_sectors_sync(sector, 1) {
1348 Ok(d) => d,
1349 Err(_) => {
1350 sector += 1;
1351 continue;
1352 }
1353 };
1354
1355 if data.len() < SECTOR_HEADER_SIZE {
1356 sector += 1;
1357 continue;
1358 }
1359
1360 // Check for deletion marker first
1361 if data.len() >= 8 && &data[..8] == b"\0DELETED" {
1362 // This sector has been deleted, skip it
1363 sector += 1;
1364 continue;
1365 }
1366
1367 let marker = u16::from_le_bytes([data[0], data[1]]);
1368 let seq_num = u16::from_le_bytes([data[2], data[3]]);
1369
1370 if marker != SECTOR_MARKER || seq_num != 0 {
1371 sector += 1;
1372 continue;
1373 }
1374
1375 if data.len() < SECTOR_HEADER_SIZE + 2 {
1376 sector += 1;
1377 continue;
1378 }
1379
1380 let key_len =
1381 u16::from_le_bytes([data[SECTOR_HEADER_SIZE], data[SECTOR_HEADER_SIZE + 1]])
1382 as usize;
1383
1384 if key_len == 0 || key_len > MAX_KEY_SIZE {
1385 sector += 1;
1386 continue;
1387 }
1388
1389 if data.len() < SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 {
1390 sector += 1;
1391 continue;
1392 }
1393
1394 let mut offset = SECTOR_HEADER_SIZE + 2;
1395 let key = data[offset..offset + key_len].to_vec();
1396 offset += key_len;
1397
1398 let value_len = u64::from_le_bytes([
1399 data[offset],
1400 data[offset + 1],
1401 data[offset + 2],
1402 data[offset + 3],
1403 data[offset + 4],
1404 data[offset + 5],
1405 data[offset + 6],
1406 data[offset + 7],
1407 ]) as usize;
1408 offset += 8;
1409
1410 let timestamp = u64::from_le_bytes([
1411 data[offset],
1412 data[offset + 1],
1413 data[offset + 2],
1414 data[offset + 3],
1415 data[offset + 4],
1416 data[offset + 5],
1417 data[offset + 6],
1418 data[offset + 7],
1419 ]);
1420
1421 let total_size = SECTOR_HEADER_SIZE + 2 + key_len + 8 + 8 + value_len;
1422 let sectors_needed = total_size.div_ceil(FEOX_BLOCK_SIZE);
1423
1424 let mut record = Record::new(key.clone(), Vec::new(), timestamp);
1425 record.sector.store(sector, Ordering::Release);
1426 record.value_len = value_len;
1427 record.clear_value();
1428
1429 let record_arc = Arc::new(record);
1430 self.hash_table.insert(key.clone(), Arc::clone(&record_arc));
1431 self.tree.insert(key, Arc::clone(&record_arc));
1432
1433 self.stats.record_count.fetch_add(1, Ordering::AcqRel);
1434 let record_size = self.calculate_record_size(key_len, value_len);
1435 self.stats
1436 .memory_usage
1437 .fetch_add(record_size, Ordering::AcqRel);
1438
1439 // Track disk usage
1440 self.stats
1441 .disk_usage
1442 .fetch_add((sectors_needed * FEOX_BLOCK_SIZE) as u64, Ordering::AcqRel);
1443
1444 for i in 0..sectors_needed {
1445 occupied_sectors.push(sector + i as u64);
1446 }
1447
1448 _records_loaded += 1;
1449 sector += sectors_needed as u64;
1450 }
1451
1452 // Now rebuild free space from gaps between occupied sectors
1453 occupied_sectors.sort_unstable();
1454
1455 // Start after metadata sectors (sectors 0-15 are reserved)
1456 let mut last_end = FEOX_DATA_START_BLOCK;
1457
1458 for &occupied_start in &occupied_sectors {
1459 if occupied_start > last_end {
1460 self.free_space
1461 .write()
1462 .release_sectors(last_end, occupied_start - last_end)?;
1463 }
1464 last_end = occupied_start + 1;
1465 }
1466
1467 if last_end < total_sectors {
1468 self.free_space
1469 .write()
1470 .release_sectors(last_end, total_sectors - last_end)?;
1471 }
1472
1473 Ok(())
1474 }
1475
1476 // ============ Utility Methods ============
1477
1478 /// Check if a key exists
1479 pub fn contains_key(&self, key: &[u8]) -> bool {
1480 self.hash_table.contains_key(key)
1481 }
1482
1483 /// Get the number of records in the store
1484 pub fn len(&self) -> usize {
1485 self.stats.record_count.load(Ordering::Acquire) as usize
1486 }
1487
1488 /// Check if the store is empty
1489 pub fn is_empty(&self) -> bool {
1490 self.len() == 0
1491 }
1492
1493 /// Get memory usage statistics
1494 pub fn memory_usage(&self) -> usize {
1495 self.stats.memory_usage.load(Ordering::Acquire)
1496 }
1497
1498 /// Get statistics snapshot
1499 pub fn stats(&self) -> crate::stats::StatsSnapshot {
1500 self.stats.snapshot()
1501 }
1502
1503 /// Flush all pending writes to disk (for persistent mode)
1504 pub fn flush(&self) {
1505 self.flush_all()
1506 }
1507}
1508
1509impl Drop for FeoxStore {
1510 fn drop(&mut self) {
1511 // Signal shutdown to write buffer workers
1512 if let Some(ref wb) = self.write_buffer {
1513 wb.initiate_shutdown();
1514 }
1515
1516 // Write metadata directly without using the write buffer
1517 if !self.memory_only {
1518 if let Some(ref disk_io) = self.disk_io {
1519 // Update metadata with current stats
1520 let mut metadata = self._metadata.write();
1521 metadata.total_records = self.stats.record_count.load(Ordering::Relaxed) as u64;
1522 metadata.total_size = self.stats.disk_usage.load(Ordering::Relaxed);
1523 metadata.fragmentation = self.free_space.read().get_fragmentation();
1524 metadata.update();
1525
1526 // Write metadata
1527 let _ = disk_io.write().write_metadata(metadata.as_bytes());
1528 let _ = disk_io.write().flush();
1529 }
1530 }
1531
1532 // Take ownership of write_buffer to properly shut it down
1533 if let Some(wb_arc) = self.write_buffer.take() {
1534 // Try to get mutable access if we're the only owner
1535 if let Ok(wb) = Arc::try_unwrap(wb_arc) {
1536 // We own it exclusively, can call complete_shutdown
1537 let mut wb_mut = wb;
1538 wb_mut.complete_shutdown();
1539 }
1540 // If we can't get exclusive access, workers are already shutting down via initiate_shutdown
1541 }
1542
1543 // Now it's safe to shutdown disk I/O since workers have exited
1544 if let Some(ref disk_io) = self.disk_io {
1545 disk_io.write().shutdown();
1546 }
1547 }
1548}