1use crate::pruning::{PrunableStore, Pruner, PruningPolicy, PruningResult};
7use crate::snapshot::{Snapshot, SnapshotConfig, SnapshotManager};
8use crate::stability::{FrontierUpdate, StabilityConfig, StabilityMonitor};
9use crate::version_vector::VersionVector;
10use mdcs_merkle::{DAGStore, Hash};
11use serde::{Deserialize, Serialize};
12use thiserror::Error;
13
14#[derive(Error, Debug)]
16pub enum CompactionError {
17 #[error("No stable snapshot available for compaction")]
18 NoStableSnapshot,
19
20 #[error("Stability requirements not met: {0}")]
21 StabilityNotMet(String),
22
23 #[error("Pruning failed: {0}")]
24 PruningFailed(String),
25
26 #[error("Snapshot creation failed: {0}")]
27 SnapshotFailed(String),
28
29 #[error("State serialization failed: {0}")]
30 SerializationFailed(String),
31
32 #[error("Verification failed: {0}")]
33 VerificationFailed(String),
34}
35
36#[derive(Clone, Debug, Serialize, Deserialize)]
38pub struct CompactionConfig {
39 #[serde(default)]
41 pub snapshot: SnapshotConfigSerializable,
42
43 #[serde(default)]
45 pub pruning: PruningPolicy,
46
47 #[serde(default)]
49 pub stability: StabilityConfigSerializable,
50
51 pub auto_compact: bool,
53
54 pub min_ops_for_compaction: u64,
56
57 pub verify_after_compaction: bool,
59}
60
61#[derive(Clone, Debug, Serialize, Deserialize)]
63pub struct SnapshotConfigSerializable {
64 pub min_operations_between: u64,
65 pub max_time_between: u64,
66 pub max_snapshots: usize,
67 pub auto_snapshot: bool,
68}
69
70impl Default for SnapshotConfigSerializable {
71 fn default() -> Self {
72 SnapshotConfigSerializable {
73 min_operations_between: 1000,
74 max_time_between: 10000,
75 max_snapshots: 10,
76 auto_snapshot: true,
77 }
78 }
79}
80
81impl From<SnapshotConfigSerializable> for SnapshotConfig {
82 fn from(s: SnapshotConfigSerializable) -> Self {
83 SnapshotConfig {
84 min_operations_between: s.min_operations_between,
85 max_time_between: s.max_time_between,
86 max_snapshots: s.max_snapshots,
87 auto_snapshot: s.auto_snapshot,
88 }
89 }
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
94pub struct StabilityConfigSerializable {
95 pub min_peers_for_stability: usize,
96 pub max_frontier_age: u64,
97 pub require_all_peers: bool,
98 pub quorum_fraction: f64,
99}
100
101impl Default for StabilityConfigSerializable {
102 fn default() -> Self {
103 StabilityConfigSerializable {
104 min_peers_for_stability: 1,
105 max_frontier_age: 10000,
106 require_all_peers: true,
107 quorum_fraction: 0.67,
108 }
109 }
110}
111
112impl From<StabilityConfigSerializable> for StabilityConfig {
113 fn from(s: StabilityConfigSerializable) -> Self {
114 StabilityConfig {
115 min_peers_for_stability: s.min_peers_for_stability,
116 max_frontier_age: s.max_frontier_age,
117 require_all_peers: s.require_all_peers,
118 quorum_fraction: s.quorum_fraction,
119 }
120 }
121}
122
123impl Default for CompactionConfig {
124 fn default() -> Self {
125 CompactionConfig {
126 snapshot: SnapshotConfigSerializable::default(),
127 pruning: PruningPolicy::default(),
128 stability: StabilityConfigSerializable::default(),
129 auto_compact: true,
130 min_ops_for_compaction: 500,
131 verify_after_compaction: true,
132 }
133 }
134}
135
136#[derive(Clone, Debug, Default)]
138pub struct CompactionStats {
139 pub snapshots_created: u64,
141
142 pub nodes_pruned: u64,
144
145 pub last_compaction: Option<u64>,
147
148 pub ops_since_compaction: u64,
150
151 pub current_dag_size: usize,
153
154 pub snapshot_count: usize,
156}
157
158pub struct Compactor {
160 replica_id: String,
162
163 config: CompactionConfig,
165
166 snapshots: SnapshotManager,
168
169 stability: StabilityMonitor,
171
172 pruner: Pruner,
174
175 stats: CompactionStats,
177
178 current_time: u64,
180}
181
182impl Compactor {
183 pub fn new(replica_id: impl Into<String>) -> Self {
185 let replica_id = replica_id.into();
186
187 Compactor {
188 snapshots: SnapshotManager::new(),
189 stability: StabilityMonitor::new(&replica_id),
190 pruner: Pruner::new(),
191 config: CompactionConfig::default(),
192 stats: CompactionStats::default(),
193 current_time: 0,
194 replica_id,
195 }
196 }
197
198 pub fn with_config(replica_id: impl Into<String>, config: CompactionConfig) -> Self {
200 let replica_id = replica_id.into();
201
202 let snapshot_config: SnapshotConfig = config.snapshot.clone().into();
203 let stability_config: StabilityConfig = config.stability.clone().into();
204
205 Compactor {
206 snapshots: SnapshotManager::with_config(snapshot_config),
207 stability: StabilityMonitor::with_config(&replica_id, stability_config),
208 pruner: Pruner::with_policy(config.pruning.clone()),
209 config,
210 stats: CompactionStats::default(),
211 current_time: 0,
212 replica_id,
213 }
214 }
215
216 pub fn replica_id(&self) -> &str {
218 &self.replica_id
219 }
220
221 pub fn config(&self) -> &CompactionConfig {
223 &self.config
224 }
225
226 pub fn snapshots(&self) -> &SnapshotManager {
228 &self.snapshots
229 }
230
231 pub fn snapshots_mut(&mut self) -> &mut SnapshotManager {
233 &mut self.snapshots
234 }
235
236 pub fn stability(&self) -> &StabilityMonitor {
238 &self.stability
239 }
240
241 pub fn stability_mut(&mut self) -> &mut StabilityMonitor {
243 &mut self.stability
244 }
245
246 pub fn pruner(&self) -> &Pruner {
248 &self.pruner
249 }
250
251 pub fn pruner_mut(&mut self) -> &mut Pruner {
253 &mut self.pruner
254 }
255
256 pub fn stats(&self) -> &CompactionStats {
258 &self.stats
259 }
260
261 pub fn set_time(&mut self, time: u64) {
263 self.current_time = time;
264 }
265
266 pub fn update_local_frontier(&mut self, vv: VersionVector, heads: Vec<Hash>) {
268 self.stability.update_local_frontier(vv, heads);
269 }
270
271 pub fn process_peer_update(&mut self, update: FrontierUpdate) {
273 self.stability.update_peer_frontier(update);
274 }
275
276 pub fn create_frontier_update(&self) -> FrontierUpdate {
278 self.stability.create_frontier_update(self.current_time)
279 }
280
281 pub fn should_snapshot(&self) -> bool {
283 self.snapshots
284 .should_snapshot(self.stability.local_frontier(), self.current_time)
285 }
286
287 pub fn create_snapshot<F>(
291 &mut self,
292 superseded_roots: Vec<Hash>,
293 state_serializer: F,
294 ) -> Result<Hash, CompactionError>
295 where
296 F: FnOnce() -> Result<Vec<u8>, String>,
297 {
298 let state_data = state_serializer().map_err(CompactionError::SerializationFailed)?;
299
300 let snapshot = Snapshot::new(
301 self.stability.local_frontier().clone(),
302 superseded_roots,
303 state_data,
304 &self.replica_id,
305 self.current_time,
306 );
307
308 let id = self.snapshots.store(snapshot);
309 self.stats.snapshots_created += 1;
310 self.stats.snapshot_count = self.snapshots.stats().count;
311
312 Ok(id)
313 }
314
315 pub fn should_compact<S: DAGStore>(&self, _store: &S) -> bool {
317 if !self.config.auto_compact {
318 return false;
319 }
320
321 if self.stability.local_frontier().total_operations() < self.config.min_ops_for_compaction {
323 return false;
324 }
325
326 if self.snapshots.stats().count < self.config.pruning.min_snapshots_before_prune {
328 return false;
329 }
330
331 if let Some(snapshot) = self.snapshots.latest() {
333 self.stability.is_stable(&snapshot.version_vector)
334 } else {
335 false
336 }
337 }
338
339 pub fn compact<S, F>(
341 &mut self,
342 store: &mut S,
343 state_serializer: F,
344 ) -> Result<CompactionResult, CompactionError>
345 where
346 S: DAGStore + PrunableStore,
347 F: FnOnce() -> Result<Vec<u8>, String>,
348 {
349 let mut result = CompactionResult::default();
350
351 if self.should_snapshot() {
353 let superseded = store.heads();
354 let snapshot_id = self.create_snapshot(superseded, state_serializer)?;
355 result.snapshot_created = Some(snapshot_id);
356 }
357
358 if let Some(snapshot) = self.snapshots.latest() {
360 if self.stability.is_stable(&snapshot.version_vector) {
361 let prune_result = self
362 .pruner
363 .execute_prune(store, snapshot, self.current_time);
364 result.nodes_pruned = prune_result.nodes_pruned;
365 result.pruning_result = Some(prune_result);
366 self.stats.nodes_pruned += result.nodes_pruned as u64;
367 }
368 }
369
370 if self.config.verify_after_compaction {
372 crate::pruning::PruningVerifier::verify_connectivity(store)
373 .map_err(CompactionError::VerificationFailed)?;
374 }
375
376 self.stats.last_compaction = Some(self.current_time);
377 self.stats.current_dag_size = store.len();
378
379 Ok(result)
380 }
381
382 pub fn tick<S, F>(
384 &mut self,
385 store: &mut S,
386 state_serializer: F,
387 time: u64,
388 ) -> Result<Option<CompactionResult>, CompactionError>
389 where
390 S: DAGStore + PrunableStore,
391 F: FnOnce() -> Result<Vec<u8>, String>,
392 {
393 self.current_time = time;
394
395 self.stability.gc_stale_peers(time);
397
398 if self.should_compact(store) {
400 let result = self.compact(store, state_serializer)?;
401 Ok(Some(result))
402 } else {
403 Ok(None)
404 }
405 }
406
407 pub fn bootstrap_from_snapshot(
411 &mut self,
412 snapshot: Snapshot,
413 ) -> Result<(Vec<u8>, VersionVector), CompactionError> {
414 let state_data = snapshot.state_data.clone();
415 let vv = snapshot.version_vector.clone();
416
417 self.snapshots.store(snapshot);
419
420 Ok((state_data, vv))
421 }
422
423 pub fn get_bootstrap_snapshot(&self) -> Option<&Snapshot> {
425 self.snapshots.latest()
426 }
427}
428
429#[derive(Clone, Debug, Default)]
431pub struct CompactionResult {
432 pub snapshot_created: Option<Hash>,
434
435 pub nodes_pruned: usize,
437
438 pub pruning_result: Option<PruningResult>,
440}
441
442#[cfg(test)]
443mod tests {
444 use super::*;
445 use mdcs_merkle::MemoryDAGStore;
446
447 #[test]
448 fn test_compactor_creation() {
449 let compactor = Compactor::new("test_replica");
450
451 assert_eq!(compactor.replica_id(), "test_replica");
452 assert_eq!(compactor.stats().snapshots_created, 0);
453 }
454
455 #[test]
456 fn test_compactor_with_config() {
457 let config = CompactionConfig {
458 auto_compact: false,
459 min_ops_for_compaction: 1000,
460 ..Default::default()
461 };
462
463 let compactor = Compactor::with_config("test", config);
464 assert!(!compactor.config().auto_compact);
465 assert_eq!(compactor.config().min_ops_for_compaction, 1000);
466 }
467
468 #[test]
469 fn test_update_local_frontier() {
470 let mut compactor = Compactor::new("test");
471
472 let vv = VersionVector::from_entries([("test".to_string(), 10)]);
473 let heads = vec![mdcs_merkle::Hasher::hash(b"head")];
474
475 compactor.update_local_frontier(vv.clone(), heads);
476
477 assert_eq!(compactor.stability().local_frontier(), &vv);
478 }
479
480 #[test]
481 fn test_create_snapshot() {
482 let mut compactor = Compactor::new("test");
483
484 let vv = VersionVector::from_entries([("test".to_string(), 10)]);
485 compactor.update_local_frontier(vv, vec![]);
486
487 let result = compactor.create_snapshot(vec![], || Ok(b"test state".to_vec()));
488
489 assert!(result.is_ok());
490 assert_eq!(compactor.stats().snapshots_created, 1);
491 }
492
493 #[test]
494 fn test_frontier_update_roundtrip() {
495 let mut compactor1 = Compactor::new("r1");
496 let mut compactor2 = Compactor::new("r2");
497
498 let vv = VersionVector::from_entries([("r1".to_string(), 10)]);
500 compactor1.update_local_frontier(vv.clone(), vec![]);
501 compactor1.set_time(100);
502
503 let update = compactor1.create_frontier_update();
505 compactor2.process_peer_update(update);
506
507 assert!(compactor2.stability().peer_frontier("r1").is_some());
509 }
510
511 #[test]
512 fn test_should_compact() {
513 let config = CompactionConfig {
514 auto_compact: true,
515 min_ops_for_compaction: 5,
516 ..Default::default()
517 };
518 let mut compactor = Compactor::with_config("test", config);
519
520 let (store, _) = MemoryDAGStore::with_genesis("test");
521
522 let vv = VersionVector::from_entries([("test".to_string(), 3)]);
524 compactor.update_local_frontier(vv, vec![]);
525 assert!(!compactor.should_compact(&store));
526
527 let vv2 = VersionVector::from_entries([("test".to_string(), 10)]);
529 compactor.update_local_frontier(vv2, vec![]);
530
531 assert!(!compactor.should_compact(&store));
533 }
534
535 #[test]
536 fn test_bootstrap_from_snapshot() {
537 let mut compactor = Compactor::new("new_replica");
538
539 let vv = VersionVector::from_entries([("origin".to_string(), 100)]);
540 let snapshot = Snapshot::new(vv.clone(), vec![], b"state data".to_vec(), "origin", 1000);
541
542 let (state_data, recovered_vv) = compactor.bootstrap_from_snapshot(snapshot).unwrap();
543
544 assert_eq!(state_data, b"state data");
545 assert_eq!(recovered_vv, vv);
546 assert_eq!(compactor.snapshots().stats().count, 1);
547 }
548
549 #[test]
550 fn test_compaction_stats() {
551 let mut compactor = Compactor::new("test");
552
553 let vv = VersionVector::from_entries([("test".to_string(), 10)]);
554 compactor.update_local_frontier(vv.clone(), vec![]);
555
556 compactor
557 .create_snapshot(vec![], || Ok(b"state1".to_vec()))
558 .unwrap();
559
560 compactor.set_time(100);
562
563 let vv2 = VersionVector::from_entries([("test".to_string(), 20)]);
565 compactor.update_local_frontier(vv2, vec![]);
566
567 compactor
568 .create_snapshot(vec![], || Ok(b"state2".to_vec()))
569 .unwrap();
570
571 let stats = compactor.stats();
572 assert_eq!(stats.snapshots_created, 2);
573 assert_eq!(stats.snapshot_count, 2);
574 }
575}