1use crate::version_vector::VersionVector;
7use mdcs_merkle::Hash;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet};
10
11#[derive(Clone, Debug, Serialize, Deserialize)]
13pub struct FrontierUpdate {
14 pub peer_id: String,
16
17 pub version_vector: VersionVector,
19
20 pub heads: Vec<Hash>,
22
23 pub timestamp: u64,
25}
26
27#[derive(Clone, Debug, PartialEq, Eq)]
29pub enum StabilityState {
30 Pending,
32
33 Partial {
35 delivered_to: HashSet<String>,
36 pending_for: HashSet<String>,
37 },
38
39 Stable,
41
42 Unknown,
44}
45
46pub struct StabilityMonitor {
51 replica_id: String,
53
54 peer_frontiers: HashMap<String, VersionVector>,
56
57 peer_heads: HashMap<String, Vec<Hash>>,
59
60 last_update: HashMap<String, u64>,
62
63 local_frontier: VersionVector,
65
66 local_heads: Vec<Hash>,
68
69 stable_frontier: VersionVector,
71
72 config: StabilityConfig,
74}
75
76#[derive(Clone, Debug)]
78pub struct StabilityConfig {
79 pub min_peers_for_stability: usize,
81
82 pub max_frontier_age: u64,
84
85 pub require_all_peers: bool,
87
88 pub quorum_fraction: f64,
90}
91
92impl Default for StabilityConfig {
93 fn default() -> Self {
94 StabilityConfig {
95 min_peers_for_stability: 1,
96 max_frontier_age: 10000,
97 require_all_peers: true,
98 quorum_fraction: 0.67,
99 }
100 }
101}
102
103impl StabilityMonitor {
104 pub fn new(replica_id: impl Into<String>) -> Self {
106 StabilityMonitor {
107 replica_id: replica_id.into(),
108 peer_frontiers: HashMap::new(),
109 peer_heads: HashMap::new(),
110 last_update: HashMap::new(),
111 local_frontier: VersionVector::new(),
112 local_heads: Vec::new(),
113 stable_frontier: VersionVector::new(),
114 config: StabilityConfig::default(),
115 }
116 }
117
118 pub fn with_config(replica_id: impl Into<String>, config: StabilityConfig) -> Self {
120 StabilityMonitor {
121 replica_id: replica_id.into(),
122 peer_frontiers: HashMap::new(),
123 peer_heads: HashMap::new(),
124 last_update: HashMap::new(),
125 local_frontier: VersionVector::new(),
126 local_heads: Vec::new(),
127 stable_frontier: VersionVector::new(),
128 config,
129 }
130 }
131
132 pub fn replica_id(&self) -> &str {
134 &self.replica_id
135 }
136
137 pub fn update_local_frontier(&mut self, vv: VersionVector, heads: Vec<Hash>) {
139 self.local_frontier = vv;
140 self.local_heads = heads;
141 self.recompute_stable_frontier();
142 }
143
144 pub fn update_peer_frontier(&mut self, update: FrontierUpdate) {
146 self.peer_frontiers
147 .insert(update.peer_id.clone(), update.version_vector);
148 self.peer_heads.insert(update.peer_id.clone(), update.heads);
149 self.last_update
150 .insert(update.peer_id.clone(), update.timestamp);
151 self.recompute_stable_frontier();
152 }
153
154 pub fn remove_peer(&mut self, peer_id: &str) {
156 self.peer_frontiers.remove(peer_id);
157 self.peer_heads.remove(peer_id);
158 self.last_update.remove(peer_id);
159 self.recompute_stable_frontier();
160 }
161
162 pub fn tracked_peers(&self) -> Vec<&String> {
164 self.peer_frontiers.keys().collect()
165 }
166
167 pub fn peer_count(&self) -> usize {
169 self.peer_frontiers.len()
170 }
171
172 pub fn peer_frontier(&self, peer_id: &str) -> Option<&VersionVector> {
174 self.peer_frontiers.get(peer_id)
175 }
176
177 pub fn stable_frontier(&self) -> &VersionVector {
179 &self.stable_frontier
180 }
181
182 pub fn local_frontier(&self) -> &VersionVector {
184 &self.local_frontier
185 }
186
187 pub fn is_operation_stable(&self, replica_id: &str, sequence: u64) -> bool {
189 self.stable_frontier.contains(replica_id, sequence)
190 }
191
192 pub fn is_stable(&self, vv: &VersionVector) -> bool {
194 self.stable_frontier.dominates(vv)
195 }
196
197 pub fn stability_state(&self, vv: &VersionVector) -> StabilityState {
199 if self.peer_frontiers.is_empty() {
200 return StabilityState::Unknown;
201 }
202
203 if self.stable_frontier.dominates(vv) {
204 return StabilityState::Stable;
205 }
206
207 let mut delivered_to = HashSet::new();
208 let mut pending_for = HashSet::new();
209
210 if self.local_frontier.dominates(vv) {
212 delivered_to.insert(self.replica_id.clone());
213 } else {
214 pending_for.insert(self.replica_id.clone());
215 }
216
217 for (peer_id, frontier) in &self.peer_frontiers {
219 if frontier.dominates(vv) {
220 delivered_to.insert(peer_id.clone());
221 } else {
222 pending_for.insert(peer_id.clone());
223 }
224 }
225
226 if pending_for.is_empty() {
227 StabilityState::Stable
228 } else if delivered_to.is_empty() {
229 StabilityState::Pending
230 } else {
231 StabilityState::Partial {
232 delivered_to,
233 pending_for,
234 }
235 }
236 }
237
238 pub fn has_quorum(&self) -> bool {
240 let total_peers = self.peer_frontiers.len() + 1; if total_peers < self.config.min_peers_for_stability {
243 return false;
244 }
245
246 if self.config.require_all_peers {
247 true } else {
249 let required = (total_peers as f64 * self.config.quorum_fraction).ceil() as usize;
250 total_peers >= required
251 }
252 }
253
254 pub fn stale_peers(&self, current_time: u64) -> Vec<String> {
256 self.last_update
257 .iter()
258 .filter(|(_, &update_time)| {
259 current_time.saturating_sub(update_time) > self.config.max_frontier_age
260 })
261 .map(|(peer_id, _)| peer_id.clone())
262 .collect()
263 }
264
265 pub fn gc_stale_peers(&mut self, current_time: u64) {
267 let stale: Vec<_> = self.stale_peers(current_time);
268 for peer_id in stale {
269 self.remove_peer(&peer_id);
270 }
271 }
272
273 fn recompute_stable_frontier(&mut self) {
275 if self.peer_frontiers.is_empty() {
276 self.stable_frontier = self.local_frontier.clone();
278 return;
279 }
280
281 let mut stable = self.local_frontier.clone();
283
284 for frontier in self.peer_frontiers.values() {
286 stable = stable.min_with(frontier);
287 }
288
289 self.stable_frontier = stable;
290 }
291
292 pub fn stats(&self) -> StabilityStats {
294 let unstable_ops = self
295 .local_frontier
296 .total_operations()
297 .saturating_sub(self.stable_frontier.total_operations());
298
299 StabilityStats {
300 peer_count: self.peer_frontiers.len(),
301 local_operations: self.local_frontier.total_operations(),
302 stable_operations: self.stable_frontier.total_operations(),
303 unstable_operations: unstable_ops,
304 has_quorum: self.has_quorum(),
305 }
306 }
307
308 pub fn create_frontier_update(&self, timestamp: u64) -> FrontierUpdate {
310 FrontierUpdate {
311 peer_id: self.replica_id.clone(),
312 version_vector: self.local_frontier.clone(),
313 heads: self.local_heads.clone(),
314 timestamp,
315 }
316 }
317}
318
319#[derive(Clone, Debug)]
321pub struct StabilityStats {
322 pub peer_count: usize,
323 pub local_operations: u64,
324 pub stable_operations: u64,
325 pub unstable_operations: u64,
326 pub has_quorum: bool,
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn test_stability_monitor_basic() {
335 let mut monitor = StabilityMonitor::new("r1");
336
337 let local_vv = VersionVector::from_entries([("r1".to_string(), 10), ("r2".to_string(), 5)]);
338 monitor.update_local_frontier(local_vv.clone(), vec![]);
339
340 assert!(monitor.is_stable(&local_vv));
342 }
343
344 #[test]
345 fn test_stability_with_peers() {
346 let mut monitor = StabilityMonitor::new("r1");
347
348 let local_vv = VersionVector::from_entries([("r1".to_string(), 10), ("r2".to_string(), 5)]);
350 monitor.update_local_frontier(local_vv, vec![]);
351
352 let peer_vv = VersionVector::from_entries([("r1".to_string(), 7), ("r2".to_string(), 5)]);
354 monitor.update_peer_frontier(FrontierUpdate {
355 peer_id: "r2".to_string(),
356 version_vector: peer_vv,
357 heads: vec![],
358 timestamp: 100,
359 });
360
361 let stable_vv = VersionVector::from_entries([("r1".to_string(), 7), ("r2".to_string(), 5)]);
363 assert!(monitor.is_stable(&stable_vv));
364
365 let unstable_vv =
367 VersionVector::from_entries([("r1".to_string(), 10), ("r2".to_string(), 5)]);
368 assert!(!monitor.is_stable(&unstable_vv));
369 }
370
371 #[test]
372 fn test_stability_state() {
373 let mut monitor = StabilityMonitor::new("r1");
374
375 let local_vv = VersionVector::from_entries([("r1".to_string(), 10)]);
376 monitor.update_local_frontier(local_vv, vec![]);
377
378 let peer_vv = VersionVector::from_entries([("r1".to_string(), 5)]);
379 monitor.update_peer_frontier(FrontierUpdate {
380 peer_id: "r2".to_string(),
381 version_vector: peer_vv,
382 heads: vec![],
383 timestamp: 100,
384 });
385
386 let vv1 = VersionVector::from_entries([("r1".to_string(), 3)]);
388 assert_eq!(monitor.stability_state(&vv1), StabilityState::Stable);
389
390 let vv2 = VersionVector::from_entries([("r1".to_string(), 7)]);
392 if let StabilityState::Partial {
393 delivered_to,
394 pending_for,
395 } = monitor.stability_state(&vv2)
396 {
397 assert!(delivered_to.contains("r1"));
398 assert!(pending_for.contains("r2"));
399 } else {
400 panic!("Expected Partial state");
401 }
402 }
403
404 #[test]
405 fn test_stale_peer_removal() {
406 let mut monitor = StabilityMonitor::new("r1");
407
408 monitor.update_peer_frontier(FrontierUpdate {
409 peer_id: "r2".to_string(),
410 version_vector: VersionVector::new(),
411 heads: vec![],
412 timestamp: 100,
413 });
414
415 assert!(monitor.stale_peers(200).is_empty());
417
418 let stale = monitor.stale_peers(20000);
420 assert_eq!(stale.len(), 1);
421 assert_eq!(stale[0], "r2");
422
423 monitor.gc_stale_peers(20000);
425 assert_eq!(monitor.peer_count(), 0);
426 }
427
428 #[test]
429 fn test_quorum() {
430 let config = StabilityConfig {
431 min_peers_for_stability: 2,
432 require_all_peers: false,
433 quorum_fraction: 0.5,
434 ..Default::default()
435 };
436
437 let mut monitor = StabilityMonitor::with_config("r1", config);
438
439 assert!(!monitor.has_quorum());
441
442 monitor.update_peer_frontier(FrontierUpdate {
444 peer_id: "r2".to_string(),
445 version_vector: VersionVector::new(),
446 heads: vec![],
447 timestamp: 100,
448 });
449 assert!(monitor.has_quorum());
450 }
451
452 #[test]
453 fn test_create_frontier_update() {
454 let mut monitor = StabilityMonitor::new("r1");
455
456 let vv = VersionVector::from_entries([("r1".to_string(), 10)]);
457 let heads = vec![mdcs_merkle::Hasher::hash(b"head1")];
458 monitor.update_local_frontier(vv.clone(), heads.clone());
459
460 let update = monitor.create_frontier_update(100);
461 assert_eq!(update.peer_id, "r1");
462 assert_eq!(update.version_vector, vv);
463 assert_eq!(update.heads, heads);
464 assert_eq!(update.timestamp, 100);
465 }
466}