mdcs_compaction/
compactor.rs

1//! High-level compaction orchestrator.
2//!
3//! The Compactor coordinates snapshotting, stability monitoring, and
4//! pruning to manage metadata growth over time.
5
6use crate::pruning::{PrunableStore, Pruner, PruningPolicy, PruningResult};
7use crate::snapshot::{Snapshot, SnapshotConfig, SnapshotManager};
8use crate::stability::{FrontierUpdate, StabilityConfig, StabilityMonitor};
9use crate::version_vector::VersionVector;
10use mdcs_merkle::{DAGStore, Hash};
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14/// Errors that can occur during compaction.
15#[derive(Error, Debug)]
16pub enum CompactionError {
17    #[error("No stable snapshot available for compaction")]
18    NoStableSnapshot,
19
20    #[error("Stability requirements not met: {0}")]
21    StabilityNotMet(String),
22
23    #[error("Pruning failed: {0}")]
24    PruningFailed(String),
25
26    #[error("Snapshot creation failed: {0}")]
27    SnapshotFailed(String),
28
29    #[error("State serialization failed: {0}")]
30    SerializationFailed(String),
31
32    #[error("Verification failed: {0}")]
33    VerificationFailed(String),
34}
35
36/// Configuration for the compactor.
37#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct CompactionConfig {
39    /// Snapshot configuration.
40    #[serde(default)]
41    pub snapshot: SnapshotConfigSerializable,
42
43    /// Pruning policy.
44    #[serde(default)]
45    pub pruning: PruningPolicy,
46
47    /// Stability configuration.
48    #[serde(default)]
49    pub stability: StabilityConfigSerializable,
50
51    /// Whether to automatically compact when thresholds are met.
52    pub auto_compact: bool,
53
54    /// Minimum operations before considering compaction.
55    pub min_ops_for_compaction: u64,
56
57    /// Whether to verify after compaction.
58    pub verify_after_compaction: bool,
59}
60
61/// Serializable version of SnapshotConfig.
62#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct SnapshotConfigSerializable {
64    pub min_operations_between: u64,
65    pub max_time_between: u64,
66    pub max_snapshots: usize,
67    pub auto_snapshot: bool,
68}
69
70impl Default for SnapshotConfigSerializable {
71    fn default() -> Self {
72        SnapshotConfigSerializable {
73            min_operations_between: 1000,
74            max_time_between: 10000,
75            max_snapshots: 10,
76            auto_snapshot: true,
77        }
78    }
79}
80
81impl From<SnapshotConfigSerializable> for SnapshotConfig {
82    fn from(s: SnapshotConfigSerializable) -> Self {
83        SnapshotConfig {
84            min_operations_between: s.min_operations_between,
85            max_time_between: s.max_time_between,
86            max_snapshots: s.max_snapshots,
87            auto_snapshot: s.auto_snapshot,
88        }
89    }
90}
91
92/// Serializable version of StabilityConfig.
93#[derive(Clone, Debug, Serialize, Deserialize)]
94pub struct StabilityConfigSerializable {
95    pub min_peers_for_stability: usize,
96    pub max_frontier_age: u64,
97    pub require_all_peers: bool,
98    pub quorum_fraction: f64,
99}
100
101impl Default for StabilityConfigSerializable {
102    fn default() -> Self {
103        StabilityConfigSerializable {
104            min_peers_for_stability: 1,
105            max_frontier_age: 10000,
106            require_all_peers: true,
107            quorum_fraction: 0.67,
108        }
109    }
110}
111
112impl From<StabilityConfigSerializable> for StabilityConfig {
113    fn from(s: StabilityConfigSerializable) -> Self {
114        StabilityConfig {
115            min_peers_for_stability: s.min_peers_for_stability,
116            max_frontier_age: s.max_frontier_age,
117            require_all_peers: s.require_all_peers,
118            quorum_fraction: s.quorum_fraction,
119        }
120    }
121}
122
123impl Default for CompactionConfig {
124    fn default() -> Self {
125        CompactionConfig {
126            snapshot: SnapshotConfigSerializable::default(),
127            pruning: PruningPolicy::default(),
128            stability: StabilityConfigSerializable::default(),
129            auto_compact: true,
130            min_ops_for_compaction: 500,
131            verify_after_compaction: true,
132        }
133    }
134}
135
136/// Statistics about compaction operations.
137#[derive(Clone, Debug, Default)]
138pub struct CompactionStats {
139    /// Total snapshots created.
140    pub snapshots_created: u64,
141
142    /// Total nodes pruned.
143    pub nodes_pruned: u64,
144
145    /// Last compaction timestamp.
146    pub last_compaction: Option<u64>,
147
148    /// Operations since last compaction.
149    pub ops_since_compaction: u64,
150
151    /// Current DAG size (nodes).
152    pub current_dag_size: usize,
153
154    /// Current snapshot count.
155    pub snapshot_count: usize,
156}
157
158/// High-level compactor that orchestrates all compaction operations.
159pub struct Compactor {
160    /// Our replica ID.
161    replica_id: String,
162
163    /// Configuration.
164    config: CompactionConfig,
165
166    /// Snapshot manager.
167    snapshots: SnapshotManager,
168
169    /// Stability monitor.
170    stability: StabilityMonitor,
171
172    /// Pruner.
173    pruner: Pruner,
174
175    /// Statistics.
176    stats: CompactionStats,
177
178    /// Current logical time.
179    current_time: u64,
180}
181
182impl Compactor {
183    /// Create a new compactor.
184    pub fn new(replica_id: impl Into<String>) -> Self {
185        let replica_id = replica_id.into();
186
187        Compactor {
188            snapshots: SnapshotManager::new(),
189            stability: StabilityMonitor::new(&replica_id),
190            pruner: Pruner::new(),
191            config: CompactionConfig::default(),
192            stats: CompactionStats::default(),
193            current_time: 0,
194            replica_id,
195        }
196    }
197
198    /// Create a compactor with custom configuration.
199    pub fn with_config(replica_id: impl Into<String>, config: CompactionConfig) -> Self {
200        let replica_id = replica_id.into();
201
202        let snapshot_config: SnapshotConfig = config.snapshot.clone().into();
203        let stability_config: StabilityConfig = config.stability.clone().into();
204
205        Compactor {
206            snapshots: SnapshotManager::with_config(snapshot_config),
207            stability: StabilityMonitor::with_config(&replica_id, stability_config),
208            pruner: Pruner::with_policy(config.pruning.clone()),
209            config,
210            stats: CompactionStats::default(),
211            current_time: 0,
212            replica_id,
213        }
214    }
215
216    /// Get the replica ID.
217    pub fn replica_id(&self) -> &str {
218        &self.replica_id
219    }
220
221    /// Get the configuration.
222    pub fn config(&self) -> &CompactionConfig {
223        &self.config
224    }
225
226    /// Get the snapshot manager.
227    pub fn snapshots(&self) -> &SnapshotManager {
228        &self.snapshots
229    }
230
231    /// Get mutable snapshot manager.
232    pub fn snapshots_mut(&mut self) -> &mut SnapshotManager {
233        &mut self.snapshots
234    }
235
236    /// Get the stability monitor.
237    pub fn stability(&self) -> &StabilityMonitor {
238        &self.stability
239    }
240
241    /// Get mutable stability monitor.
242    pub fn stability_mut(&mut self) -> &mut StabilityMonitor {
243        &mut self.stability
244    }
245
246    /// Get the pruner.
247    pub fn pruner(&self) -> &Pruner {
248        &self.pruner
249    }
250
251    /// Get mutable pruner.
252    pub fn pruner_mut(&mut self) -> &mut Pruner {
253        &mut self.pruner
254    }
255
256    /// Get statistics.
257    pub fn stats(&self) -> &CompactionStats {
258        &self.stats
259    }
260
261    /// Update the current time.
262    pub fn set_time(&mut self, time: u64) {
263        self.current_time = time;
264    }
265
266    /// Update local frontier (call after state changes).
267    pub fn update_local_frontier(&mut self, vv: VersionVector, heads: Vec<Hash>) {
268        self.stability.update_local_frontier(vv, heads);
269    }
270
271    /// Process a frontier update from a peer.
272    pub fn process_peer_update(&mut self, update: FrontierUpdate) {
273        self.stability.update_peer_frontier(update);
274    }
275
276    /// Create a frontier update for broadcasting.
277    pub fn create_frontier_update(&self) -> FrontierUpdate {
278        self.stability.create_frontier_update(self.current_time)
279    }
280
281    /// Check if a snapshot should be created.
282    pub fn should_snapshot(&self) -> bool {
283        self.snapshots
284            .should_snapshot(self.stability.local_frontier(), self.current_time)
285    }
286
287    /// Create a snapshot from the current state.
288    ///
289    /// The `state_serializer` function should serialize the current CRDT state.
290    pub fn create_snapshot<F>(
291        &mut self,
292        superseded_roots: Vec<Hash>,
293        state_serializer: F,
294    ) -> Result<Hash, CompactionError>
295    where
296        F: FnOnce() -> Result<Vec<u8>, String>,
297    {
298        let state_data = state_serializer().map_err(CompactionError::SerializationFailed)?;
299
300        let snapshot = Snapshot::new(
301            self.stability.local_frontier().clone(),
302            superseded_roots,
303            state_data,
304            &self.replica_id,
305            self.current_time,
306        );
307
308        let id = self.snapshots.store(snapshot);
309        self.stats.snapshots_created += 1;
310        self.stats.snapshot_count = self.snapshots.stats().count;
311
312        Ok(id)
313    }
314
315    /// Check if compaction should be performed.
316    pub fn should_compact<S: DAGStore>(&self, _store: &S) -> bool {
317        if !self.config.auto_compact {
318            return false;
319        }
320
321        // Need at least min_ops_for_compaction operations
322        if self.stability.local_frontier().total_operations() < self.config.min_ops_for_compaction {
323            return false;
324        }
325
326        // Need at least min_snapshots_before_prune snapshots
327        if self.snapshots.stats().count < self.config.pruning.min_snapshots_before_prune {
328            return false;
329        }
330
331        // Need a stable snapshot
332        if let Some(snapshot) = self.snapshots.latest() {
333            self.stability.is_stable(&snapshot.version_vector)
334        } else {
335            false
336        }
337    }
338
339    /// Perform compaction (snapshot + prune if needed).
340    pub fn compact<S, F>(
341        &mut self,
342        store: &mut S,
343        state_serializer: F,
344    ) -> Result<CompactionResult, CompactionError>
345    where
346        S: DAGStore + PrunableStore,
347        F: FnOnce() -> Result<Vec<u8>, String>,
348    {
349        let mut result = CompactionResult::default();
350
351        // Create snapshot if needed
352        if self.should_snapshot() {
353            let superseded = store.heads();
354            let snapshot_id = self.create_snapshot(superseded, state_serializer)?;
355            result.snapshot_created = Some(snapshot_id);
356        }
357
358        // Prune if we have a stable snapshot
359        if let Some(snapshot) = self.snapshots.latest() {
360            if self.stability.is_stable(&snapshot.version_vector) {
361                let prune_result = self
362                    .pruner
363                    .execute_prune(store, snapshot, self.current_time);
364                result.nodes_pruned = prune_result.nodes_pruned;
365                result.pruning_result = Some(prune_result);
366                self.stats.nodes_pruned += result.nodes_pruned as u64;
367            }
368        }
369
370        // Verify if configured
371        if self.config.verify_after_compaction {
372            crate::pruning::PruningVerifier::verify_connectivity(store)
373                .map_err(CompactionError::VerificationFailed)?;
374        }
375
376        self.stats.last_compaction = Some(self.current_time);
377        self.stats.current_dag_size = store.len();
378
379        Ok(result)
380    }
381
382    /// Perform automatic maintenance (GC stale peers, auto-compact if needed).
383    pub fn tick<S, F>(
384        &mut self,
385        store: &mut S,
386        state_serializer: F,
387        time: u64,
388    ) -> Result<Option<CompactionResult>, CompactionError>
389    where
390        S: DAGStore + PrunableStore,
391        F: FnOnce() -> Result<Vec<u8>, String>,
392    {
393        self.current_time = time;
394
395        // GC stale peers
396        self.stability.gc_stale_peers(time);
397
398        // Auto-compact if needed
399        if self.should_compact(store) {
400            let result = self.compact(store, state_serializer)?;
401            Ok(Some(result))
402        } else {
403            Ok(None)
404        }
405    }
406
407    /// Bootstrap from a snapshot.
408    ///
409    /// Returns the deserialized state data and the version vector.
410    pub fn bootstrap_from_snapshot(
411        &mut self,
412        snapshot: Snapshot,
413    ) -> Result<(Vec<u8>, VersionVector), CompactionError> {
414        let state_data = snapshot.state_data.clone();
415        let vv = snapshot.version_vector.clone();
416
417        // Store the snapshot
418        self.snapshots.store(snapshot);
419
420        Ok((state_data, vv))
421    }
422
423    /// Get the best snapshot for bootstrapping a new replica.
424    pub fn get_bootstrap_snapshot(&self) -> Option<&Snapshot> {
425        self.snapshots.latest()
426    }
427}
428
429/// Result of a compaction operation.
430#[derive(Clone, Debug, Default)]
431pub struct CompactionResult {
432    /// ID of snapshot created, if any.
433    pub snapshot_created: Option<Hash>,
434
435    /// Number of nodes pruned.
436    pub nodes_pruned: usize,
437
438    /// Detailed pruning result.
439    pub pruning_result: Option<PruningResult>,
440}
441
442#[cfg(test)]
443mod tests {
444    use super::*;
445    use mdcs_merkle::MemoryDAGStore;
446
447    #[test]
448    fn test_compactor_creation() {
449        let compactor = Compactor::new("test_replica");
450
451        assert_eq!(compactor.replica_id(), "test_replica");
452        assert_eq!(compactor.stats().snapshots_created, 0);
453    }
454
455    #[test]
456    fn test_compactor_with_config() {
457        let config = CompactionConfig {
458            auto_compact: false,
459            min_ops_for_compaction: 1000,
460            ..Default::default()
461        };
462
463        let compactor = Compactor::with_config("test", config);
464        assert!(!compactor.config().auto_compact);
465        assert_eq!(compactor.config().min_ops_for_compaction, 1000);
466    }
467
468    #[test]
469    fn test_update_local_frontier() {
470        let mut compactor = Compactor::new("test");
471
472        let vv = VersionVector::from_entries([("test".to_string(), 10)]);
473        let heads = vec![mdcs_merkle::Hasher::hash(b"head")];
474
475        compactor.update_local_frontier(vv.clone(), heads);
476
477        assert_eq!(compactor.stability().local_frontier(), &vv);
478    }
479
480    #[test]
481    fn test_create_snapshot() {
482        let mut compactor = Compactor::new("test");
483
484        let vv = VersionVector::from_entries([("test".to_string(), 10)]);
485        compactor.update_local_frontier(vv, vec![]);
486
487        let result = compactor.create_snapshot(vec![], || Ok(b"test state".to_vec()));
488
489        assert!(result.is_ok());
490        assert_eq!(compactor.stats().snapshots_created, 1);
491    }
492
493    #[test]
494    fn test_frontier_update_roundtrip() {
495        let mut compactor1 = Compactor::new("r1");
496        let mut compactor2 = Compactor::new("r2");
497
498        // Update r1's frontier
499        let vv = VersionVector::from_entries([("r1".to_string(), 10)]);
500        compactor1.update_local_frontier(vv.clone(), vec![]);
501        compactor1.set_time(100);
502
503        // Create update and send to r2
504        let update = compactor1.create_frontier_update();
505        compactor2.process_peer_update(update);
506
507        // r2 should now know about r1's frontier
508        assert!(compactor2.stability().peer_frontier("r1").is_some());
509    }
510
511    #[test]
512    fn test_should_compact() {
513        let config = CompactionConfig {
514            auto_compact: true,
515            min_ops_for_compaction: 5,
516            ..Default::default()
517        };
518        let mut compactor = Compactor::with_config("test", config);
519
520        let (store, _) = MemoryDAGStore::with_genesis("test");
521
522        // Not enough operations yet
523        let vv = VersionVector::from_entries([("test".to_string(), 3)]);
524        compactor.update_local_frontier(vv, vec![]);
525        assert!(!compactor.should_compact(&store));
526
527        // Add enough operations
528        let vv2 = VersionVector::from_entries([("test".to_string(), 10)]);
529        compactor.update_local_frontier(vv2, vec![]);
530
531        // Still need snapshots
532        assert!(!compactor.should_compact(&store));
533    }
534
535    #[test]
536    fn test_bootstrap_from_snapshot() {
537        let mut compactor = Compactor::new("new_replica");
538
539        let vv = VersionVector::from_entries([("origin".to_string(), 100)]);
540        let snapshot = Snapshot::new(vv.clone(), vec![], b"state data".to_vec(), "origin", 1000);
541
542        let (state_data, recovered_vv) = compactor.bootstrap_from_snapshot(snapshot).unwrap();
543
544        assert_eq!(state_data, b"state data");
545        assert_eq!(recovered_vv, vv);
546        assert_eq!(compactor.snapshots().stats().count, 1);
547    }
548
549    #[test]
550    fn test_compaction_stats() {
551        let mut compactor = Compactor::new("test");
552
553        let vv = VersionVector::from_entries([("test".to_string(), 10)]);
554        compactor.update_local_frontier(vv.clone(), vec![]);
555
556        compactor
557            .create_snapshot(vec![], || Ok(b"state1".to_vec()))
558            .unwrap();
559
560        // Advance time to get a different snapshot ID
561        compactor.set_time(100);
562
563        // Update frontier so second snapshot is different
564        let vv2 = VersionVector::from_entries([("test".to_string(), 20)]);
565        compactor.update_local_frontier(vv2, vec![]);
566
567        compactor
568            .create_snapshot(vec![], || Ok(b"state2".to_vec()))
569            .unwrap();
570
571        let stats = compactor.stats();
572        assert_eq!(stats.snapshots_created, 2);
573        assert_eq!(stats.snapshot_count, 2);
574    }
575}