1use crate::snapshot::Snapshot;
7use crate::stability::StabilityMonitor;
8use crate::version_vector::VersionVector;
9use mdcs_merkle::{DAGStore, Hash};
10use serde::{Deserialize, Serialize};
11use std::collections::HashSet;
12
13#[derive(Clone, Debug, Serialize, Deserialize)]
15pub struct PruningPolicy {
16 pub min_snapshots_before_prune: usize,
18
19 pub min_node_age: u64,
21
22 pub max_nodes_per_prune: usize,
24
25 pub require_stability: bool,
27
28 pub preserve_genesis_path: bool,
30
31 pub preserve_depth: usize,
33}
34
35impl Default for PruningPolicy {
36 fn default() -> Self {
37 PruningPolicy {
38 min_snapshots_before_prune: 2,
39 min_node_age: 5000,
40 max_nodes_per_prune: 1000,
41 require_stability: true,
42 preserve_genesis_path: true,
43 preserve_depth: 10,
44 }
45 }
46}
47
48#[derive(Clone, Debug)]
50pub struct PruningResult {
51 pub nodes_pruned: usize,
53
54 pub pruned_cids: Vec<Hash>,
56
57 pub snapshot_root: Option<Hash>,
59
60 pub skipped: Vec<(Hash, String)>,
62
63 pub completed: bool,
65}
66
67impl PruningResult {
68 pub fn empty() -> Self {
70 PruningResult {
71 nodes_pruned: 0,
72 pruned_cids: Vec::new(),
73 snapshot_root: None,
74 skipped: Vec::new(),
75 completed: true,
76 }
77 }
78}
79
80pub struct Pruner {
82 policy: PruningPolicy,
84
85 preserved: HashSet<Hash>,
87
88 stable_frontier: Option<VersionVector>,
90}
91
92impl Pruner {
93 pub fn new() -> Self {
95 Pruner {
96 policy: PruningPolicy::default(),
97 preserved: HashSet::new(),
98 stable_frontier: None,
99 }
100 }
101
102 pub fn with_policy(policy: PruningPolicy) -> Self {
104 Pruner {
105 policy,
106 preserved: HashSet::new(),
107 stable_frontier: None,
108 }
109 }
110
111 pub fn policy(&self) -> &PruningPolicy {
113 &self.policy
114 }
115
116 pub fn set_stable_frontier(&mut self, frontier: VersionVector) {
118 self.stable_frontier = Some(frontier);
119 }
120
121 pub fn preserve(&mut self, cid: Hash) {
123 self.preserved.insert(cid);
124 }
125
126 pub fn clear_preserved(&mut self) {
128 self.preserved.clear();
129 }
130
131 pub fn identify_prunable<S: DAGStore>(
136 &self,
137 store: &S,
138 snapshot: &Snapshot,
139 current_time: u64,
140 ) -> Vec<Hash> {
141 let mut prunable = Vec::new();
142
143 let all_nodes = store.topological_order();
145
146 let mut snapshot_ancestors: HashSet<_> = HashSet::new();
151 for root in &snapshot.superseded_roots {
152 snapshot_ancestors.insert(*root);
154 snapshot_ancestors.extend(store.ancestors(root));
155 }
156
157 let mut preserved = self.preserved.clone();
159
160 for head in store.heads() {
162 preserved.insert(head);
163 }
164
165 for head in store.heads() {
167 let ancestors = self.ancestors_within_depth(store, &head, self.policy.preserve_depth);
168 preserved.extend(ancestors);
169 }
170
171 for root in &snapshot.superseded_roots {
173 preserved.insert(*root);
174 }
175
176 if self.policy.preserve_genesis_path {
178 if let Some(genesis_path) = self.find_genesis_path(store) {
179 preserved.extend(genesis_path);
180 }
181 }
182
183 for cid in all_nodes {
184 if prunable.len() >= self.policy.max_nodes_per_prune {
186 break;
187 }
188
189 if preserved.contains(&cid) {
191 continue;
192 }
193
194 if !snapshot_ancestors.contains(&cid) {
196 continue;
197 }
198
199 if let Some(node) = store.get(&cid) {
201 if current_time.saturating_sub(node.timestamp) < self.policy.min_node_age {
202 continue;
203 }
204 }
205
206 prunable.push(cid);
207 }
208
209 prunable
210 }
211
212 pub fn execute_prune<S: DAGStore + PrunableStore>(
216 &self,
217 store: &mut S,
218 snapshot: &Snapshot,
219 current_time: u64,
220 ) -> PruningResult {
221 let prunable = self.identify_prunable(store, snapshot, current_time);
222
223 if prunable.is_empty() {
224 return PruningResult::empty();
225 }
226
227 let mut result = PruningResult {
228 nodes_pruned: 0,
229 pruned_cids: Vec::new(),
230 snapshot_root: Some(snapshot.id),
231 skipped: Vec::new(),
232 completed: true,
233 };
234
235 for cid in prunable {
236 match store.remove(&cid) {
237 Ok(()) => {
238 result.nodes_pruned += 1;
239 result.pruned_cids.push(cid);
240 }
241 Err(e) => {
242 result.skipped.push((cid, e));
243 }
244 }
245 }
246
247 if !result.skipped.is_empty() {
248 result.completed = false;
249 }
250
251 result
252 }
253
254 pub fn should_prune<S: DAGStore>(
256 &self,
257 store: &S,
258 snapshot: &Snapshot,
259 stability_monitor: Option<&StabilityMonitor>,
260 ) -> bool {
261 if self.policy.require_stability {
263 if let Some(monitor) = stability_monitor {
264 if !monitor.is_stable(&snapshot.version_vector) {
265 return false;
266 }
267 } else {
268 return false;
269 }
270 }
271
272 let node_count = store.topological_order().len();
274 node_count > self.policy.preserve_depth + 1
275 }
276
277 fn ancestors_within_depth<S: DAGStore>(
279 &self,
280 store: &S,
281 cid: &Hash,
282 depth: usize,
283 ) -> HashSet<Hash> {
284 let mut result = HashSet::new();
285 let mut frontier = vec![*cid];
286 let mut current_depth = 0;
287
288 while current_depth < depth && !frontier.is_empty() {
289 let mut next_frontier = Vec::new();
290
291 for node_cid in frontier {
292 if let Some(node) = store.get(&node_cid) {
293 for parent in &node.parents {
294 if result.insert(*parent) {
295 next_frontier.push(*parent);
296 }
297 }
298 }
299 }
300
301 frontier = next_frontier;
302 current_depth += 1;
303 }
304
305 result
306 }
307
308 fn find_genesis_path<S: DAGStore>(&self, store: &S) -> Option<Vec<Hash>> {
310 let heads = store.heads();
311 if heads.is_empty() {
312 return None;
313 }
314
315 let mut path = Vec::new();
316 let mut current = heads[0];
317
318 while let Some(node) = store.get(¤t) {
319 path.push(current);
320
321 if node.parents.is_empty() {
322 break;
324 }
325
326 current = node.parents[0];
328 }
329
330 if path.is_empty() {
331 None
332 } else {
333 Some(path)
334 }
335 }
336}
337
338impl Default for Pruner {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344pub trait PrunableStore: DAGStore {
346 fn remove(&mut self, cid: &Hash) -> Result<(), String>;
348
349 fn remove_batch(&mut self, cids: &[Hash]) -> Result<usize, String> {
351 let mut removed = 0;
352 for cid in cids {
353 if self.remove(cid).is_ok() {
354 removed += 1;
355 }
356 }
357 Ok(removed)
358 }
359}
360
361pub struct PruningVerifier;
367
368impl PruningVerifier {
369 pub fn verify_no_resurrection<S: DAGStore>(
374 store: &S,
375 pruned: &[Hash],
376 _snapshot: &Snapshot,
377 ) -> Result<(), String> {
378 let pruned_set: HashSet<_> = pruned.iter().copied().collect();
383
384 for cid in store.topological_order() {
385 if let Some(node) = store.get(&cid) {
386 for parent in &node.parents {
387 if pruned_set.contains(parent) && !pruned_set.contains(&cid) {
388 return Err(format!(
389 "Node {} references pruned parent {}",
390 cid.to_hex(),
391 parent.to_hex()
392 ));
393 }
394 }
395 }
396 }
397
398 Ok(())
399 }
400
401 pub fn verify_connectivity<S: DAGStore>(store: &S) -> Result<(), String> {
403 let heads = store.heads();
404 if heads.is_empty() {
405 return Err("No heads in store".to_string());
406 }
407
408 for head in &heads {
411 let ancestors = store.ancestors(head);
412 for ancestor in &ancestors {
413 if !store.contains(ancestor) {
414 return Err(format!(
415 "Head {} references missing ancestor {}",
416 head.to_hex(),
417 ancestor.to_hex()
418 ));
419 }
420 }
421 }
422
423 Ok(())
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use mdcs_merkle::{MemoryDAGStore, NodeBuilder, Payload};
431
432 #[test]
433 fn test_pruner_creation() {
434 let pruner = Pruner::new();
435 assert_eq!(pruner.policy().min_snapshots_before_prune, 2);
436
437 let custom_policy = PruningPolicy {
438 min_snapshots_before_prune: 5,
439 ..Default::default()
440 };
441 let custom_pruner = Pruner::with_policy(custom_policy);
442 assert_eq!(custom_pruner.policy().min_snapshots_before_prune, 5);
443 }
444
445 #[test]
446 fn test_pruning_policy_defaults() {
447 let policy = PruningPolicy::default();
448
449 assert_eq!(policy.min_snapshots_before_prune, 2);
450 assert_eq!(policy.min_node_age, 5000);
451 assert_eq!(policy.max_nodes_per_prune, 1000);
452 assert!(policy.require_stability);
453 assert!(policy.preserve_genesis_path);
454 assert_eq!(policy.preserve_depth, 10);
455 }
456
457 #[test]
458 fn test_identify_prunable() {
459 let (mut store, genesis) = MemoryDAGStore::with_genesis("test");
460
461 let node_a = NodeBuilder::new()
463 .with_parent(genesis)
464 .with_payload(Payload::delta(b"a".to_vec()))
465 .with_timestamp(100)
466 .with_creator("test")
467 .build();
468 let cid_a = store.put(node_a).unwrap();
469
470 let node_b = NodeBuilder::new()
471 .with_parent(cid_a)
472 .with_payload(Payload::delta(b"b".to_vec()))
473 .with_timestamp(200)
474 .with_creator("test")
475 .build();
476 let cid_b = store.put(node_b).unwrap();
477
478 let node_c = NodeBuilder::new()
479 .with_parent(cid_b)
480 .with_payload(Payload::delta(b"c".to_vec()))
481 .with_timestamp(300)
482 .with_creator("test")
483 .build();
484 let cid_c = store.put(node_c).unwrap();
485
486 let node_d = NodeBuilder::new()
487 .with_parent(cid_c)
488 .with_payload(Payload::delta(b"d".to_vec()))
489 .with_timestamp(400)
490 .with_creator("test")
491 .build();
492 let _cid_d = store.put(node_d).unwrap();
493
494 let vv = VersionVector::from_entries([("test".to_string(), 3)]);
496 let snapshot = Snapshot::new(vv, vec![cid_c], b"state".to_vec(), "test", 300);
497
498 let policy = PruningPolicy {
500 min_node_age: 50,
501 preserve_depth: 1,
502 preserve_genesis_path: false,
503 ..Default::default()
504 };
505 let pruner = Pruner::with_policy(policy);
506
507 let prunable = pruner.identify_prunable(&store, &snapshot, 500);
508
509 assert!(!prunable.is_empty());
512 }
513
514 #[test]
515 fn test_preserve_nodes() {
516 let mut pruner = Pruner::new();
517 let cid = mdcs_merkle::Hasher::hash(b"test");
518
519 pruner.preserve(cid);
520 assert!(pruner.preserved.contains(&cid));
521
522 pruner.clear_preserved();
523 assert!(pruner.preserved.is_empty());
524 }
525
526 #[test]
527 fn test_pruning_result_empty() {
528 let result = PruningResult::empty();
529
530 assert_eq!(result.nodes_pruned, 0);
531 assert!(result.pruned_cids.is_empty());
532 assert!(result.snapshot_root.is_none());
533 assert!(result.completed);
534 }
535}