mdcs_compaction/
stability.rs

1//! Stability monitoring for safe compaction.
2//!
3//! The stability monitor tracks which updates have been delivered to
4//! all known replicas, enabling safe pruning of the DAG history.
5
6use crate::version_vector::VersionVector;
7use mdcs_merkle::Hash;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10
11/// Update about a peer's frontier.
12#[derive(Clone, Debug, Serialize, Deserialize)]
13pub struct FrontierUpdate {
14    /// The peer that sent this update.
15    pub peer_id: String,
16
17    /// The peer's current version vector.
18    pub version_vector: VersionVector,
19
20    /// The peer's current DAG heads.
21    pub heads: Vec<Hash>,
22
23    /// Timestamp of the update.
24    pub timestamp: u64,
25}
26
27/// State of stability tracking for a single item.
28#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum StabilityState {
30    /// Not yet delivered to any peer.
31    Pending,
32
33    /// Delivered to some but not all peers.
34    Partial {
35        delivered_to: HashSet<String>,
36        pending_for: HashSet<String>,
37    },
38
39    /// Delivered to all tracked peers - safe to compact.
40    Stable,
41
42    /// Unknown state (no tracking info).
43    Unknown,
44}
45
46/// Monitors stability across replicas for safe compaction decisions.
47///
48/// Stability is achieved when an update has been delivered to all
49/// tracked peers. Only stable updates can be safely compacted.
50pub struct StabilityMonitor {
51    /// Our replica ID.
52    replica_id: String,
53
54    /// Known peer frontiers (version vectors).
55    peer_frontiers: HashMap<String, VersionVector>,
56
57    /// Known peer heads (DAG heads).
58    peer_heads: HashMap<String, Vec<Hash>>,
59
60    /// Timestamp of last update from each peer.
61    last_update: HashMap<String, u64>,
62
63    /// Our current version vector.
64    local_frontier: VersionVector,
65
66    /// Our current DAG heads.
67    local_heads: Vec<Hash>,
68
69    /// The computed stable frontier (min of all known frontiers).
70    stable_frontier: VersionVector,
71
72    /// Configuration.
73    config: StabilityConfig,
74}
75
76/// Configuration for stability monitoring.
77#[derive(Clone, Debug)]
78pub struct StabilityConfig {
79    /// Minimum number of peers required for stability.
80    pub min_peers_for_stability: usize,
81
82    /// Maximum age of peer frontier before considered stale.
83    pub max_frontier_age: u64,
84
85    /// Whether to require all peers for stability (vs quorum).
86    pub require_all_peers: bool,
87
88    /// Quorum fraction (0.0 - 1.0) if not requiring all peers.
89    pub quorum_fraction: f64,
90}
91
92impl Default for StabilityConfig {
93    fn default() -> Self {
94        StabilityConfig {
95            min_peers_for_stability: 1,
96            max_frontier_age: 10000,
97            require_all_peers: true,
98            quorum_fraction: 0.67,
99        }
100    }
101}
102
103impl StabilityMonitor {
104    /// Create a new stability monitor.
105    pub fn new(replica_id: impl Into<String>) -> Self {
106        StabilityMonitor {
107            replica_id: replica_id.into(),
108            peer_frontiers: HashMap::new(),
109            peer_heads: HashMap::new(),
110            last_update: HashMap::new(),
111            local_frontier: VersionVector::new(),
112            local_heads: Vec::new(),
113            stable_frontier: VersionVector::new(),
114            config: StabilityConfig::default(),
115        }
116    }
117
118    /// Create with custom configuration.
119    pub fn with_config(replica_id: impl Into<String>, config: StabilityConfig) -> Self {
120        StabilityMonitor {
121            replica_id: replica_id.into(),
122            peer_frontiers: HashMap::new(),
123            peer_heads: HashMap::new(),
124            last_update: HashMap::new(),
125            local_frontier: VersionVector::new(),
126            local_heads: Vec::new(),
127            stable_frontier: VersionVector::new(),
128            config,
129        }
130    }
131
132    /// Get our replica ID.
133    pub fn replica_id(&self) -> &str {
134        &self.replica_id
135    }
136
137    /// Update our local frontier.
138    pub fn update_local_frontier(&mut self, vv: VersionVector, heads: Vec<Hash>) {
139        self.local_frontier = vv;
140        self.local_heads = heads;
141        self.recompute_stable_frontier();
142    }
143
144    /// Update a peer's frontier.
145    pub fn update_peer_frontier(&mut self, update: FrontierUpdate) {
146        self.peer_frontiers
147            .insert(update.peer_id.clone(), update.version_vector);
148        self.peer_heads.insert(update.peer_id.clone(), update.heads);
149        self.last_update
150            .insert(update.peer_id.clone(), update.timestamp);
151        self.recompute_stable_frontier();
152    }
153
154    /// Remove a peer from tracking.
155    pub fn remove_peer(&mut self, peer_id: &str) {
156        self.peer_frontiers.remove(peer_id);
157        self.peer_heads.remove(peer_id);
158        self.last_update.remove(peer_id);
159        self.recompute_stable_frontier();
160    }
161
162    /// Get the list of tracked peers.
163    pub fn tracked_peers(&self) -> Vec<&String> {
164        self.peer_frontiers.keys().collect()
165    }
166
167    /// Get the number of tracked peers.
168    pub fn peer_count(&self) -> usize {
169        self.peer_frontiers.len()
170    }
171
172    /// Get a peer's frontier.
173    pub fn peer_frontier(&self, peer_id: &str) -> Option<&VersionVector> {
174        self.peer_frontiers.get(peer_id)
175    }
176
177    /// Get the stable frontier.
178    pub fn stable_frontier(&self) -> &VersionVector {
179        &self.stable_frontier
180    }
181
182    /// Get the local frontier.
183    pub fn local_frontier(&self) -> &VersionVector {
184        &self.local_frontier
185    }
186
187    /// Check if a specific operation is stable.
188    pub fn is_operation_stable(&self, replica_id: &str, sequence: u64) -> bool {
189        self.stable_frontier.contains(replica_id, sequence)
190    }
191
192    /// Check if a version vector is fully stable.
193    pub fn is_stable(&self, vv: &VersionVector) -> bool {
194        self.stable_frontier.dominates(vv)
195    }
196
197    /// Get the stability state for a version vector.
198    pub fn stability_state(&self, vv: &VersionVector) -> StabilityState {
199        if self.peer_frontiers.is_empty() {
200            return StabilityState::Unknown;
201        }
202
203        if self.stable_frontier.dominates(vv) {
204            return StabilityState::Stable;
205        }
206
207        let mut delivered_to = HashSet::new();
208        let mut pending_for = HashSet::new();
209
210        // Check local delivery
211        if self.local_frontier.dominates(vv) {
212            delivered_to.insert(self.replica_id.clone());
213        } else {
214            pending_for.insert(self.replica_id.clone());
215        }
216
217        // Check peer delivery
218        for (peer_id, frontier) in &self.peer_frontiers {
219            if frontier.dominates(vv) {
220                delivered_to.insert(peer_id.clone());
221            } else {
222                pending_for.insert(peer_id.clone());
223            }
224        }
225
226        if pending_for.is_empty() {
227            StabilityState::Stable
228        } else if delivered_to.is_empty() {
229            StabilityState::Pending
230        } else {
231            StabilityState::Partial {
232                delivered_to,
233                pending_for,
234            }
235        }
236    }
237
238    /// Check if we have enough peers for meaningful stability.
239    pub fn has_quorum(&self) -> bool {
240        let total_peers = self.peer_frontiers.len() + 1; // +1 for self
241
242        if total_peers < self.config.min_peers_for_stability {
243            return false;
244        }
245
246        if self.config.require_all_peers {
247            true // All peers are tracked
248        } else {
249            let required = (total_peers as f64 * self.config.quorum_fraction).ceil() as usize;
250            total_peers >= required
251        }
252    }
253
254    /// Get stale peers (those with old frontier updates).
255    pub fn stale_peers(&self, current_time: u64) -> Vec<String> {
256        self.last_update
257            .iter()
258            .filter(|(_, &update_time)| {
259                current_time.saturating_sub(update_time) > self.config.max_frontier_age
260            })
261            .map(|(peer_id, _)| peer_id.clone())
262            .collect()
263    }
264
265    /// Remove stale peers.
266    pub fn gc_stale_peers(&mut self, current_time: u64) {
267        let stale: Vec<_> = self.stale_peers(current_time);
268        for peer_id in stale {
269            self.remove_peer(&peer_id);
270        }
271    }
272
273    /// Recompute the stable frontier.
274    fn recompute_stable_frontier(&mut self) {
275        if self.peer_frontiers.is_empty() {
276            // No peers - stable frontier is local frontier
277            self.stable_frontier = self.local_frontier.clone();
278            return;
279        }
280
281        // Start with local frontier
282        let mut stable = self.local_frontier.clone();
283
284        // Compute minimum with all peer frontiers
285        for frontier in self.peer_frontiers.values() {
286            stable = stable.min_with(frontier);
287        }
288
289        self.stable_frontier = stable;
290    }
291
292    /// Get statistics about stability.
293    pub fn stats(&self) -> StabilityStats {
294        let unstable_ops = self
295            .local_frontier
296            .total_operations()
297            .saturating_sub(self.stable_frontier.total_operations());
298
299        StabilityStats {
300            peer_count: self.peer_frontiers.len(),
301            local_operations: self.local_frontier.total_operations(),
302            stable_operations: self.stable_frontier.total_operations(),
303            unstable_operations: unstable_ops,
304            has_quorum: self.has_quorum(),
305        }
306    }
307
308    /// Create a frontier update message for broadcasting.
309    pub fn create_frontier_update(&self, timestamp: u64) -> FrontierUpdate {
310        FrontierUpdate {
311            peer_id: self.replica_id.clone(),
312            version_vector: self.local_frontier.clone(),
313            heads: self.local_heads.clone(),
314            timestamp,
315        }
316    }
317}
318
319/// Statistics about stability.
320#[derive(Clone, Debug)]
321pub struct StabilityStats {
322    pub peer_count: usize,
323    pub local_operations: u64,
324    pub stable_operations: u64,
325    pub unstable_operations: u64,
326    pub has_quorum: bool,
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn test_stability_monitor_basic() {
335        let mut monitor = StabilityMonitor::new("r1");
336
337        let local_vv = VersionVector::from_entries([("r1".to_string(), 10), ("r2".to_string(), 5)]);
338        monitor.update_local_frontier(local_vv.clone(), vec![]);
339
340        // With no peers, local frontier is stable
341        assert!(monitor.is_stable(&local_vv));
342    }
343
344    #[test]
345    fn test_stability_with_peers() {
346        let mut monitor = StabilityMonitor::new("r1");
347
348        // Local frontier
349        let local_vv = VersionVector::from_entries([("r1".to_string(), 10), ("r2".to_string(), 5)]);
350        monitor.update_local_frontier(local_vv, vec![]);
351
352        // Peer frontier (behind local)
353        let peer_vv = VersionVector::from_entries([("r1".to_string(), 7), ("r2".to_string(), 5)]);
354        monitor.update_peer_frontier(FrontierUpdate {
355            peer_id: "r2".to_string(),
356            version_vector: peer_vv,
357            heads: vec![],
358            timestamp: 100,
359        });
360
361        // Operations up to (r1:7, r2:5) should be stable
362        let stable_vv = VersionVector::from_entries([("r1".to_string(), 7), ("r2".to_string(), 5)]);
363        assert!(monitor.is_stable(&stable_vv));
364
365        // Operations at (r1:10, r2:5) should NOT be stable
366        let unstable_vv =
367            VersionVector::from_entries([("r1".to_string(), 10), ("r2".to_string(), 5)]);
368        assert!(!monitor.is_stable(&unstable_vv));
369    }
370
371    #[test]
372    fn test_stability_state() {
373        let mut monitor = StabilityMonitor::new("r1");
374
375        let local_vv = VersionVector::from_entries([("r1".to_string(), 10)]);
376        monitor.update_local_frontier(local_vv, vec![]);
377
378        let peer_vv = VersionVector::from_entries([("r1".to_string(), 5)]);
379        monitor.update_peer_frontier(FrontierUpdate {
380            peer_id: "r2".to_string(),
381            version_vector: peer_vv,
382            heads: vec![],
383            timestamp: 100,
384        });
385
386        // Check state for operation r1:3 (stable)
387        let vv1 = VersionVector::from_entries([("r1".to_string(), 3)]);
388        assert_eq!(monitor.stability_state(&vv1), StabilityState::Stable);
389
390        // Check state for operation r1:7 (partial)
391        let vv2 = VersionVector::from_entries([("r1".to_string(), 7)]);
392        if let StabilityState::Partial {
393            delivered_to,
394            pending_for,
395        } = monitor.stability_state(&vv2)
396        {
397            assert!(delivered_to.contains("r1"));
398            assert!(pending_for.contains("r2"));
399        } else {
400            panic!("Expected Partial state");
401        }
402    }
403
404    #[test]
405    fn test_stale_peer_removal() {
406        let mut monitor = StabilityMonitor::new("r1");
407
408        monitor.update_peer_frontier(FrontierUpdate {
409            peer_id: "r2".to_string(),
410            version_vector: VersionVector::new(),
411            heads: vec![],
412            timestamp: 100,
413        });
414
415        // At time 200, peer is not stale (within max_frontier_age of 10000)
416        assert!(monitor.stale_peers(200).is_empty());
417
418        // At time 20000, peer is stale
419        let stale = monitor.stale_peers(20000);
420        assert_eq!(stale.len(), 1);
421        assert_eq!(stale[0], "r2");
422
423        // GC stale peers
424        monitor.gc_stale_peers(20000);
425        assert_eq!(monitor.peer_count(), 0);
426    }
427
428    #[test]
429    fn test_quorum() {
430        let config = StabilityConfig {
431            min_peers_for_stability: 2,
432            require_all_peers: false,
433            quorum_fraction: 0.5,
434            ..Default::default()
435        };
436
437        let mut monitor = StabilityMonitor::with_config("r1", config);
438
439        // Only self - no quorum
440        assert!(!monitor.has_quorum());
441
442        // Add one peer - now have quorum (2/2 >= 0.5)
443        monitor.update_peer_frontier(FrontierUpdate {
444            peer_id: "r2".to_string(),
445            version_vector: VersionVector::new(),
446            heads: vec![],
447            timestamp: 100,
448        });
449        assert!(monitor.has_quorum());
450    }
451
452    #[test]
453    fn test_create_frontier_update() {
454        let mut monitor = StabilityMonitor::new("r1");
455
456        let vv = VersionVector::from_entries([("r1".to_string(), 10)]);
457        let heads = vec![mdcs_merkle::Hasher::hash(b"head1")];
458        monitor.update_local_frontier(vv.clone(), heads.clone());
459
460        let update = monitor.create_frontier_update(100);
461        assert_eq!(update.peer_id, "r1");
462        assert_eq!(update.version_vector, vv);
463        assert_eq!(update.heads, heads);
464        assert_eq!(update.timestamp, 100);
465    }
466}