mdcs_delta/
anti_entropy.rs

1//! δ-CRDT Anti-Entropy Algorithm 1 (Convergence Mode)
2//!
3//! This module implements the anti-entropy algorithm from the δ-CRDT paper.
4//! It handles delta propagation, acknowledgments, and convergence testing.
5//!
6//! # Algorithm 1 Overview
7//!
8//! Each replica maintains:
9//! - X: the local CRDT state
10//! - D: delta buffer (sequence of deltas)
11//! - acked\[j\]: last sequence number acknowledged by peer j
12//!
13//! Protocol:
14//! 1. On local mutation m:
15//!    - d = mδ(X)     // compute delta
16//!    - X = X ⊔ d     // apply to local state
17//!    - D.push(d)     // buffer for sending
18//!
19//! 2. On send to peer j:
20//!    - send D\[acked\[j\]..\] to j
21//!
22//! 3. On receive delta d from peer i:
23//!    - X = X ⊔ d     // apply (idempotent!)
24//!    - send ack(seq) to i
25
26use crate::buffer::{DeltaReplica, ReplicaId, SeqNo};
27use mdcs_core::lattice::Lattice;
28use std::collections::VecDeque;
29
30/// Message types for the anti-entropy protocol
31#[derive(Debug, Clone)]
32pub enum AntiEntropyMessage<D> {
33    /// Delta message: contains delta, source, destination and sequence number
34    Delta {
35        from: ReplicaId,
36        to: ReplicaId,
37        delta: D,
38        seq: SeqNo,
39    },
40    /// Acknowledgment message: from -> to acknowledges seq
41    Ack {
42        from: ReplicaId,
43        to: ReplicaId,
44        seq: SeqNo,
45    },
46}
47
48/// A network simulator for testing anti-entropy under various conditions
49#[derive(Debug)]
50pub struct NetworkSimulator<D> {
51    /// Messages in flight
52    in_flight: VecDeque<AntiEntropyMessage<D>>,
53    /// Messages that were "lost"
54    lost: Vec<AntiEntropyMessage<D>>,
55    /// Configuration
56    config: NetworkConfig,
57    /// Random seed for deterministic testing
58    rng_state: u64,
59}
60
61/// Network configuration for simulation
62#[derive(Debug, Clone)]
63pub struct NetworkConfig {
64    /// Probability of message loss (0.0 - 1.0)
65    pub loss_rate: f64,
66    /// Probability of message duplication (0.0 - 1.0)
67    pub dup_rate: f64,
68    /// Probability of message reordering (0.0 - 1.0)
69    pub reorder_rate: f64,
70}
71
72impl Default for NetworkConfig {
73    fn default() -> Self {
74        Self {
75            loss_rate: 0.0,
76            dup_rate: 0.0,
77            reorder_rate: 0.0,
78        }
79    }
80}
81
82impl NetworkConfig {
83    /// Create a lossy network configuration
84    pub fn lossy(loss_rate: f64) -> Self {
85        Self {
86            loss_rate,
87            ..Default::default()
88        }
89    }
90
91    /// Create a network with duplicates
92    pub fn with_dups(dup_rate: f64) -> Self {
93        Self {
94            dup_rate,
95            ..Default::default()
96        }
97    }
98
99    /// Create a chaotic network (all problems)
100    pub fn chaotic() -> Self {
101        Self {
102            loss_rate: 0.1,
103            dup_rate: 0.2,
104            reorder_rate: 0.3,
105        }
106    }
107}
108
109impl<D: Clone> NetworkSimulator<D> {
110    pub fn new(config: NetworkConfig) -> Self {
111        Self {
112            in_flight: VecDeque::new(),
113            lost: Vec::new(),
114            config,
115            rng_state: 12345,
116        }
117    }
118
119    /// Simple LCG random number generator
120    fn next_random(&mut self) -> f64 {
121        self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
122        ((self.rng_state >> 16) & 0x7fff) as f64 / 32768.0
123    }
124
125    /// Send a message through the network
126    pub fn send(&mut self, msg: AntiEntropyMessage<D>) {
127        // Check for loss
128        if self.next_random() < self.config.loss_rate {
129            self.lost.push(msg);
130            return;
131        }
132
133        // Check for duplication
134        if self.next_random() < self.config.dup_rate {
135            self.in_flight.push_back(msg.clone());
136        }
137
138        // Check for reordering
139        if self.next_random() < self.config.reorder_rate && !self.in_flight.is_empty() {
140            // Insert at random position
141            let pos = (self.next_random() * self.in_flight.len() as f64) as usize;
142            let pos = pos.min(self.in_flight.len());
143            // VecDeque doesn't have insert, so we'll just push and let it reorder naturally
144            self.in_flight.push_back(msg);
145            if pos < self.in_flight.len() - 1 {
146                // Swap with a random earlier position to simulate reordering
147                self.in_flight.swap(pos, self.in_flight.len() - 1);
148            }
149        } else {
150            self.in_flight.push_back(msg);
151        }
152    }
153
154    /// Receive the next message (if any)
155    pub fn receive(&mut self) -> Option<AntiEntropyMessage<D>> {
156        self.in_flight.pop_front()
157    }
158
159    /// Re-send lost messages (simulates retransmission)
160    pub fn retransmit_lost(&mut self) {
161        for msg in self.lost.drain(..) {
162            self.in_flight.push_back(msg);
163        }
164    }
165
166    /// Check if network is empty
167    pub fn is_empty(&self) -> bool {
168        self.in_flight.is_empty()
169    }
170
171    /// Number of messages in flight
172    pub fn in_flight_count(&self) -> usize {
173        self.in_flight.len()
174    }
175
176    /// Number of lost messages
177    pub fn lost_count(&self) -> usize {
178        self.lost.len()
179    }
180}
181
182/// Anti-entropy coordinator for a cluster of replicas
183#[derive(Debug)]
184pub struct AntiEntropyCluster<S: Lattice + Clone> {
185    /// All replicas in the cluster
186    replicas: Vec<DeltaReplica<S, S>>,
187    /// Network simulator
188    network: NetworkSimulator<S>,
189}
190
191impl<S: Lattice + Clone> AntiEntropyCluster<S> {
192    /// Create a new cluster with n replicas
193    pub fn new(n: usize, config: NetworkConfig) -> Self {
194        let mut replicas = Vec::with_capacity(n);
195
196        // Create replicas
197        for i in 0..n {
198            let mut replica = DeltaReplica::new(format!("replica_{}", i));
199            // Register all other peers
200            for j in 0..n {
201                if i != j {
202                    replica.register_peer(format!("replica_{}", j));
203                }
204            }
205            replicas.push(replica);
206        }
207
208        Self {
209            replicas,
210            network: NetworkSimulator::new(config),
211        }
212    }
213
214    /// Get replica by index
215    pub fn replica(&self, idx: usize) -> &DeltaReplica<S, S> {
216        &self.replicas[idx]
217    }
218
219    /// Get mutable replica by index
220    pub fn replica_mut(&mut self, idx: usize) -> &mut DeltaReplica<S, S> {
221        &mut self.replicas[idx]
222    }
223
224    /// Perform a mutation on a specific replica
225    pub fn mutate<F>(&mut self, replica_idx: usize, mutator: F) -> S
226    where
227        F: FnOnce(&S) -> S,
228    {
229        self.replicas[replica_idx].mutate(mutator)
230    }
231
232    /// Initiate sync from one replica to another
233    pub fn initiate_sync(&mut self, from_idx: usize, to_idx: usize) {
234        let to_id = self.replicas[to_idx].id.clone();
235        if let Some((delta, seq)) = self.replicas[from_idx].prepare_sync(&to_id) {
236            let msg = AntiEntropyMessage::Delta {
237                from: self.replicas[from_idx].id.clone(),
238                to: to_id.clone(),
239                delta,
240                seq,
241            };
242            self.network.send(msg);
243        }
244    }
245
246    /// Process one network message
247    pub fn process_one(&mut self) -> bool {
248        if let Some(msg) = self.network.receive() {
249            match msg {
250                AntiEntropyMessage::Delta {
251                    from,
252                    to,
253                    delta,
254                    seq,
255                } => {
256                    // Deliver delta to the intended recipient only
257                    for replica in &mut self.replicas {
258                        if replica.id == to {
259                            replica.receive_delta(&delta);
260                            // Send ack back to the original sender
261                            let ack = AntiEntropyMessage::Ack {
262                                from: replica.id.clone(),
263                                to: from.clone(),
264                                seq,
265                            };
266                            self.network.send(ack);
267                            break;
268                        }
269                    }
270                }
271                AntiEntropyMessage::Ack { from, to, seq } => {
272                    // Deliver ack to the intended recipient only
273                    for replica in &mut self.replicas {
274                        if replica.id == to {
275                            replica.process_ack(&from, seq);
276                            break;
277                        }
278                    }
279                }
280            }
281            true
282        } else {
283            false
284        }
285    }
286
287    /// Run until network is empty
288    pub fn drain_network(&mut self) {
289        while self.process_one() {}
290    }
291
292    /// Broadcast delta from one replica to all others
293    pub fn broadcast(&mut self, from_idx: usize) {
294        let n = self.replicas.len();
295        for to_idx in 0..n {
296            if from_idx != to_idx {
297                self.initiate_sync(from_idx, to_idx);
298            }
299        }
300    }
301
302    /// Full sync: every replica syncs with every other replica
303    pub fn full_sync_round(&mut self) {
304        let n = self.replicas.len();
305        for from_idx in 0..n {
306            for to_idx in 0..n {
307                if from_idx != to_idx {
308                    self.initiate_sync(from_idx, to_idx);
309                }
310            }
311        }
312        self.drain_network();
313    }
314
315    /// Check if all replicas have converged
316    pub fn is_converged(&self) -> bool {
317        if self.replicas.len() < 2 {
318            return true;
319        }
320
321        let first = self.replicas[0].state();
322        self.replicas.iter().skip(1).all(|r| r.state() == first)
323    }
324
325    /// Retransmit lost messages and process
326    pub fn retransmit_and_process(&mut self) {
327        self.network.retransmit_lost();
328        self.drain_network();
329    }
330
331    /// Get number of replicas
332    pub fn len(&self) -> usize {
333        self.replicas.len()
334    }
335
336    /// Check if cluster is empty
337    pub fn is_empty(&self) -> bool {
338        self.replicas.is_empty()
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345    use mdcs_core::gset::GSet;
346
347    #[test]
348    fn test_network_simulator_basic() {
349        let mut net: NetworkSimulator<i32> = NetworkSimulator::new(NetworkConfig::default());
350
351        net.send(AntiEntropyMessage::Delta {
352            from: "r1".to_string(),
353            to: "".to_string(),
354            delta: 42,
355            seq: 1,
356        });
357
358        assert_eq!(net.in_flight_count(), 1);
359
360        let msg = net.receive().unwrap();
361        match msg {
362            AntiEntropyMessage::Delta { delta, .. } => assert_eq!(delta, 42),
363            _ => panic!("Expected delta message"),
364        }
365    }
366
367    #[test]
368    fn test_cluster_basic_convergence() {
369        let mut cluster: AntiEntropyCluster<GSet<i32>> =
370            AntiEntropyCluster::new(3, NetworkConfig::default());
371
372        // Replica 0 inserts 1
373        cluster.mutate(0, |_| {
374            let mut d = GSet::new();
375            d.insert(1);
376            d
377        });
378
379        // Replica 1 inserts 2
380        cluster.mutate(1, |_| {
381            let mut d = GSet::new();
382            d.insert(2);
383            d
384        });
385
386        // Not converged yet
387        assert!(!cluster.is_converged());
388
389        // Full sync
390        cluster.full_sync_round();
391
392        // Now should be converged
393        assert!(cluster.is_converged());
394
395        // All replicas should have both elements
396        for i in 0..3 {
397            assert!(cluster.replica(i).state().contains(&1));
398            assert!(cluster.replica(i).state().contains(&2));
399        }
400    }
401
402    #[test]
403    fn test_convergence_under_loss() {
404        let mut cluster: AntiEntropyCluster<GSet<i32>> =
405            AntiEntropyCluster::new(3, NetworkConfig::lossy(0.5));
406
407        // Add different elements to each replica
408        for i in 0..3 {
409            let val = (i + 1) as i32;
410            cluster.mutate(i, move |_| {
411                let mut d = GSet::new();
412                d.insert(val);
413                d
414            });
415        }
416
417        // Do multiple sync rounds with retransmission
418        for _ in 0..10 {
419            cluster.full_sync_round();
420            cluster.retransmit_and_process();
421        }
422
423        // Should eventually converge
424        assert!(cluster.is_converged());
425
426        // All elements should be present
427        for i in 0..3 {
428            for val in 1..=3 {
429                assert!(cluster.replica(i).state().contains(&val));
430            }
431        }
432    }
433
434    #[test]
435    fn test_convergence_with_duplicates() {
436        let mut cluster: AntiEntropyCluster<GSet<i32>> =
437            AntiEntropyCluster::new(2, NetworkConfig::with_dups(0.5));
438
439        cluster.mutate(0, |_| {
440            let mut d = GSet::new();
441            d.insert(1);
442            d
443        });
444
445        cluster.mutate(1, |_| {
446            let mut d = GSet::new();
447            d.insert(2);
448            d
449        });
450
451        // Sync multiple times (duplicates should be handled by idempotence)
452        for _ in 0..5 {
453            cluster.full_sync_round();
454        }
455
456        assert!(cluster.is_converged());
457
458        // Both elements present
459        assert!(cluster.replica(0).state().contains(&1));
460        assert!(cluster.replica(0).state().contains(&2));
461    }
462
463    #[test]
464    fn test_convergence_chaotic_network() {
465        let mut cluster: AntiEntropyCluster<GSet<i32>> =
466            AntiEntropyCluster::new(4, NetworkConfig::chaotic());
467
468        // Each replica adds multiple elements
469        for i in 0..4 {
470            for j in 0..5 {
471                let val = (i * 10 + j) as i32;
472                cluster.mutate(i, move |_| {
473                    let mut d = GSet::new();
474                    d.insert(val);
475                    d
476                });
477            }
478        }
479
480        // Many sync rounds with retransmission
481        for _ in 0..20 {
482            cluster.full_sync_round();
483            cluster.retransmit_and_process();
484        }
485
486        // Should converge eventually
487        assert!(cluster.is_converged());
488
489        // Verify all 20 elements are present in all replicas
490        for i in 0..4 {
491            for j in 0..4 {
492                for k in 0..5 {
493                    let val = j * 10 + k;
494                    assert!(
495                        cluster.replica(i).state().contains(&val),
496                        "Replica {} missing value {}",
497                        i,
498                        val
499                    );
500                }
501            }
502        }
503    }
504
505    #[test]
506    fn test_idempotence_repeated_resends() {
507        let mut cluster: AntiEntropyCluster<GSet<i32>> =
508            AntiEntropyCluster::new(2, NetworkConfig::default());
509
510        cluster.mutate(0, |_| {
511            let mut d = GSet::new();
512            d.insert(42);
513            d
514        });
515
516        // Initial state
517        let initial_state = cluster.replica(1).state().clone();
518
519        // Sync once
520        cluster.full_sync_round();
521        let after_one = cluster.replica(1).state().clone();
522
523        // Sync many more times (simulating re-sends)
524        for _ in 0..10 {
525            cluster.full_sync_round();
526        }
527        let after_many = cluster.replica(1).state().clone();
528
529        // State should be the same after first sync and after many syncs
530        assert_eq!(after_one, after_many);
531
532        // But different from initial
533        assert_ne!(initial_state, after_one);
534    }
535}