mdcs_compaction/
pruning.rs

1//! DAG pruning for bounded history growth.
2//!
3//! Pruning removes nodes from the Merkle-DAG that are older than
4//! the last stable snapshot, reducing storage requirements.
5
6use crate::snapshot::Snapshot;
7use crate::stability::StabilityMonitor;
8use crate::version_vector::VersionVector;
9use mdcs_merkle::{DAGStore, Hash};
10use serde::{Deserialize, Serialize};
11use std::collections::HashSet;
12
13/// Policy for DAG pruning decisions.
14#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct PruningPolicy {
16    /// Minimum number of snapshots to retain before pruning.
17    pub min_snapshots_before_prune: usize,
18
19    /// Minimum age of nodes (in logical time) before they can be pruned.
20    pub min_node_age: u64,
21
22    /// Maximum number of nodes to prune in one operation.
23    pub max_nodes_per_prune: usize,
24
25    /// Whether to require stability before pruning.
26    pub require_stability: bool,
27
28    /// Whether to keep at least one path to genesis.
29    pub preserve_genesis_path: bool,
30
31    /// Depth of history to preserve beyond the snapshot.
32    pub preserve_depth: usize,
33}
34
35impl Default for PruningPolicy {
36    fn default() -> Self {
37        PruningPolicy {
38            min_snapshots_before_prune: 2,
39            min_node_age: 5000,
40            max_nodes_per_prune: 1000,
41            require_stability: true,
42            preserve_genesis_path: true,
43            preserve_depth: 10,
44        }
45    }
46}
47
48/// Result of a pruning operation.
49#[derive(Clone, Debug)]
50pub struct PruningResult {
51    /// Number of nodes pruned.
52    pub nodes_pruned: usize,
53
54    /// CIDs of pruned nodes.
55    pub pruned_cids: Vec<Hash>,
56
57    /// The snapshot root used as the pruning boundary.
58    pub snapshot_root: Option<Hash>,
59
60    /// Nodes that couldn't be pruned (and why).
61    pub skipped: Vec<(Hash, String)>,
62
63    /// Whether pruning completed fully or was limited.
64    pub completed: bool,
65}
66
67impl PruningResult {
68    /// Create an empty result (nothing pruned).
69    pub fn empty() -> Self {
70        PruningResult {
71            nodes_pruned: 0,
72            pruned_cids: Vec::new(),
73            snapshot_root: None,
74            skipped: Vec::new(),
75            completed: true,
76        }
77    }
78}
79
80/// Pruner for removing old DAG nodes.
81pub struct Pruner {
82    /// Pruning policy.
83    policy: PruningPolicy,
84
85    /// Set of CIDs that must be preserved (e.g., recent snapshots).
86    preserved: HashSet<Hash>,
87
88    /// The stable frontier at the time of pruning.
89    stable_frontier: Option<VersionVector>,
90}
91
92impl Pruner {
93    /// Create a new pruner with default policy.
94    pub fn new() -> Self {
95        Pruner {
96            policy: PruningPolicy::default(),
97            preserved: HashSet::new(),
98            stable_frontier: None,
99        }
100    }
101
102    /// Create a pruner with custom policy.
103    pub fn with_policy(policy: PruningPolicy) -> Self {
104        Pruner {
105            policy,
106            preserved: HashSet::new(),
107            stable_frontier: None,
108        }
109    }
110
111    /// Get the current policy.
112    pub fn policy(&self) -> &PruningPolicy {
113        &self.policy
114    }
115
116    /// Set the stable frontier for pruning decisions.
117    pub fn set_stable_frontier(&mut self, frontier: VersionVector) {
118        self.stable_frontier = Some(frontier);
119    }
120
121    /// Mark a CID as preserved (cannot be pruned).
122    pub fn preserve(&mut self, cid: Hash) {
123        self.preserved.insert(cid);
124    }
125
126    /// Clear preserved CIDs.
127    pub fn clear_preserved(&mut self) {
128        self.preserved.clear();
129    }
130
131    /// Identify nodes that can be safely pruned.
132    ///
133    /// This doesn't actually modify the store - it returns the list of
134    /// nodes that would be pruned if `execute_prune` is called.
135    pub fn identify_prunable<S: DAGStore>(
136        &self,
137        store: &S,
138        snapshot: &Snapshot,
139        current_time: u64,
140    ) -> Vec<Hash> {
141        let mut prunable = Vec::new();
142
143        // Get all nodes in topological order (oldest first)
144        let all_nodes = store.topological_order();
145
146        // Find ancestors of the snapshot's superseded roots (these are candidates for pruning)
147        // We use superseded_roots because snapshot.id is a hash of the snapshot content,
148        // not a node in the DAG. The superseded_roots are the actual DAG heads that
149        // the snapshot was created from.
150        let mut snapshot_ancestors: HashSet<_> = HashSet::new();
151        for root in &snapshot.superseded_roots {
152            // Include the root itself and all its ancestors
153            snapshot_ancestors.insert(*root);
154            snapshot_ancestors.extend(store.ancestors(root));
155        }
156
157        // Find nodes to preserve (heads and their recent ancestors)
158        let mut preserved = self.preserved.clone();
159
160        // Preserve current heads
161        for head in store.heads() {
162            preserved.insert(head);
163        }
164
165        // Preserve nodes within preserve_depth of heads
166        for head in store.heads() {
167            let ancestors = self.ancestors_within_depth(store, &head, self.policy.preserve_depth);
168            preserved.extend(ancestors);
169        }
170
171        // Preserve the snapshot's superseded roots (they are the snapshot boundary nodes)
172        for root in &snapshot.superseded_roots {
173            preserved.insert(*root);
174        }
175
176        // If preserving genesis path, mark it
177        if self.policy.preserve_genesis_path {
178            if let Some(genesis_path) = self.find_genesis_path(store) {
179                preserved.extend(genesis_path);
180            }
181        }
182
183        for cid in all_nodes {
184            // Skip if already at limit
185            if prunable.len() >= self.policy.max_nodes_per_prune {
186                break;
187            }
188
189            // Skip preserved nodes
190            if preserved.contains(&cid) {
191                continue;
192            }
193
194            // Skip if not an ancestor of the snapshot
195            if !snapshot_ancestors.contains(&cid) {
196                continue;
197            }
198
199            // Skip if not old enough
200            if let Some(node) = store.get(&cid) {
201                if current_time.saturating_sub(node.timestamp) < self.policy.min_node_age {
202                    continue;
203                }
204            }
205
206            prunable.push(cid);
207        }
208
209        prunable
210    }
211
212    /// Execute pruning on a mutable store.
213    ///
214    /// Returns the result of the pruning operation.
215    pub fn execute_prune<S: DAGStore + PrunableStore>(
216        &self,
217        store: &mut S,
218        snapshot: &Snapshot,
219        current_time: u64,
220    ) -> PruningResult {
221        let prunable = self.identify_prunable(store, snapshot, current_time);
222
223        if prunable.is_empty() {
224            return PruningResult::empty();
225        }
226
227        let mut result = PruningResult {
228            nodes_pruned: 0,
229            pruned_cids: Vec::new(),
230            snapshot_root: Some(snapshot.id),
231            skipped: Vec::new(),
232            completed: true,
233        };
234
235        for cid in prunable {
236            match store.remove(&cid) {
237                Ok(()) => {
238                    result.nodes_pruned += 1;
239                    result.pruned_cids.push(cid);
240                }
241                Err(e) => {
242                    result.skipped.push((cid, e));
243                }
244            }
245        }
246
247        if !result.skipped.is_empty() {
248            result.completed = false;
249        }
250
251        result
252    }
253
254    /// Check if pruning should be performed.
255    pub fn should_prune<S: DAGStore>(
256        &self,
257        store: &S,
258        snapshot: &Snapshot,
259        stability_monitor: Option<&StabilityMonitor>,
260    ) -> bool {
261        // Check stability requirement
262        if self.policy.require_stability {
263            if let Some(monitor) = stability_monitor {
264                if !monitor.is_stable(&snapshot.version_vector) {
265                    return false;
266                }
267            } else {
268                return false;
269            }
270        }
271
272        // Check if there are any prunable nodes
273        let node_count = store.topological_order().len();
274        node_count > self.policy.preserve_depth + 1
275    }
276
277    /// Get ancestors within a certain depth.
278    fn ancestors_within_depth<S: DAGStore>(
279        &self,
280        store: &S,
281        cid: &Hash,
282        depth: usize,
283    ) -> HashSet<Hash> {
284        let mut result = HashSet::new();
285        let mut frontier = vec![*cid];
286        let mut current_depth = 0;
287
288        while current_depth < depth && !frontier.is_empty() {
289            let mut next_frontier = Vec::new();
290
291            for node_cid in frontier {
292                if let Some(node) = store.get(&node_cid) {
293                    for parent in &node.parents {
294                        if result.insert(*parent) {
295                            next_frontier.push(*parent);
296                        }
297                    }
298                }
299            }
300
301            frontier = next_frontier;
302            current_depth += 1;
303        }
304
305        result
306    }
307
308    /// Find a path from any head to genesis.
309    fn find_genesis_path<S: DAGStore>(&self, store: &S) -> Option<Vec<Hash>> {
310        let heads = store.heads();
311        if heads.is_empty() {
312            return None;
313        }
314
315        let mut path = Vec::new();
316        let mut current = heads[0];
317
318        while let Some(node) = store.get(&current) {
319            path.push(current);
320
321            if node.parents.is_empty() {
322                // Reached genesis
323                break;
324            }
325
326            // Follow first parent
327            current = node.parents[0];
328        }
329
330        if path.is_empty() {
331            None
332        } else {
333            Some(path)
334        }
335    }
336}
337
338impl Default for Pruner {
339    fn default() -> Self {
340        Self::new()
341    }
342}
343
344/// Extension trait for stores that support node removal.
345pub trait PrunableStore: DAGStore {
346    /// Remove a node from the store.
347    fn remove(&mut self, cid: &Hash) -> Result<(), String>;
348
349    /// Remove multiple nodes.
350    fn remove_batch(&mut self, cids: &[Hash]) -> Result<usize, String> {
351        let mut removed = 0;
352        for cid in cids {
353            if self.remove(cid).is_ok() {
354                removed += 1;
355            }
356        }
357        Ok(removed)
358    }
359}
360
361// Note: MemoryDAGStore doesn't actually support removal (immutable by design).
362// For testing purposes, we use wrapper types that track "pruned" nodes.
363// In production, a proper store implementation would handle removal.
364
365/// Verification utilities for pruning safety.
366pub struct PruningVerifier;
367
368impl PruningVerifier {
369    /// Verify that no "live" items would be resurrected after pruning.
370    ///
371    /// This checks that all remove operations that reference pruned nodes
372    /// are still correctly represented in the post-prune state.
373    pub fn verify_no_resurrection<S: DAGStore>(
374        store: &S,
375        pruned: &[Hash],
376        _snapshot: &Snapshot,
377    ) -> Result<(), String> {
378        // The snapshot should contain the complete state at its point
379        // Any removes that happened before the snapshot are captured
380        // We verify that no pruned nodes are referenced by non-pruned nodes
381
382        let pruned_set: HashSet<_> = pruned.iter().copied().collect();
383
384        for cid in store.topological_order() {
385            if let Some(node) = store.get(&cid) {
386                for parent in &node.parents {
387                    if pruned_set.contains(parent) && !pruned_set.contains(&cid) {
388                        return Err(format!(
389                            "Node {} references pruned parent {}",
390                            cid.to_hex(),
391                            parent.to_hex()
392                        ));
393                    }
394                }
395            }
396        }
397
398        Ok(())
399    }
400
401    /// Verify that the DAG is still connected after pruning.
402    pub fn verify_connectivity<S: DAGStore>(store: &S) -> Result<(), String> {
403        let heads = store.heads();
404        if heads.is_empty() {
405            return Err("No heads in store".to_string());
406        }
407
408        // Check that all heads can reach some common ancestor
409        // (or at least don't have dangling references)
410        for head in &heads {
411            let ancestors = store.ancestors(head);
412            for ancestor in &ancestors {
413                if !store.contains(ancestor) {
414                    return Err(format!(
415                        "Head {} references missing ancestor {}",
416                        head.to_hex(),
417                        ancestor.to_hex()
418                    ));
419                }
420            }
421        }
422
423        Ok(())
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430    use mdcs_merkle::{MemoryDAGStore, NodeBuilder, Payload};
431
432    #[test]
433    fn test_pruner_creation() {
434        let pruner = Pruner::new();
435        assert_eq!(pruner.policy().min_snapshots_before_prune, 2);
436
437        let custom_policy = PruningPolicy {
438            min_snapshots_before_prune: 5,
439            ..Default::default()
440        };
441        let custom_pruner = Pruner::with_policy(custom_policy);
442        assert_eq!(custom_pruner.policy().min_snapshots_before_prune, 5);
443    }
444
445    #[test]
446    fn test_pruning_policy_defaults() {
447        let policy = PruningPolicy::default();
448
449        assert_eq!(policy.min_snapshots_before_prune, 2);
450        assert_eq!(policy.min_node_age, 5000);
451        assert_eq!(policy.max_nodes_per_prune, 1000);
452        assert!(policy.require_stability);
453        assert!(policy.preserve_genesis_path);
454        assert_eq!(policy.preserve_depth, 10);
455    }
456
457    #[test]
458    fn test_identify_prunable() {
459        let (mut store, genesis) = MemoryDAGStore::with_genesis("test");
460
461        // Create a chain: genesis -> a -> b -> c -> d
462        let node_a = NodeBuilder::new()
463            .with_parent(genesis)
464            .with_payload(Payload::delta(b"a".to_vec()))
465            .with_timestamp(100)
466            .with_creator("test")
467            .build();
468        let cid_a = store.put(node_a).unwrap();
469
470        let node_b = NodeBuilder::new()
471            .with_parent(cid_a)
472            .with_payload(Payload::delta(b"b".to_vec()))
473            .with_timestamp(200)
474            .with_creator("test")
475            .build();
476        let cid_b = store.put(node_b).unwrap();
477
478        let node_c = NodeBuilder::new()
479            .with_parent(cid_b)
480            .with_payload(Payload::delta(b"c".to_vec()))
481            .with_timestamp(300)
482            .with_creator("test")
483            .build();
484        let cid_c = store.put(node_c).unwrap();
485
486        let node_d = NodeBuilder::new()
487            .with_parent(cid_c)
488            .with_payload(Payload::delta(b"d".to_vec()))
489            .with_timestamp(400)
490            .with_creator("test")
491            .build();
492        let _cid_d = store.put(node_d).unwrap();
493
494        // Create a snapshot at node_c
495        let vv = VersionVector::from_entries([("test".to_string(), 3)]);
496        let snapshot = Snapshot::new(vv, vec![cid_c], b"state".to_vec(), "test", 300);
497
498        // Create pruner with low age requirement for testing
499        let policy = PruningPolicy {
500            min_node_age: 50,
501            preserve_depth: 1,
502            preserve_genesis_path: false,
503            ..Default::default()
504        };
505        let pruner = Pruner::with_policy(policy);
506
507        let prunable = pruner.identify_prunable(&store, &snapshot, 500);
508
509        // Genesis and cid_a should be prunable (they're ancestors of snapshot, old enough)
510        // cid_b and cid_c are within preserve_depth of heads
511        assert!(!prunable.is_empty());
512    }
513
514    #[test]
515    fn test_preserve_nodes() {
516        let mut pruner = Pruner::new();
517        let cid = mdcs_merkle::Hasher::hash(b"test");
518
519        pruner.preserve(cid);
520        assert!(pruner.preserved.contains(&cid));
521
522        pruner.clear_preserved();
523        assert!(pruner.preserved.is_empty());
524    }
525
526    #[test]
527    fn test_pruning_result_empty() {
528        let result = PruningResult::empty();
529
530        assert_eq!(result.nodes_pruned, 0);
531        assert!(result.pruned_cids.is_empty());
532        assert!(result.snapshot_root.is_none());
533        assert!(result.completed);
534    }
535}