mdcs_delta/
causal.rs

1//! Causal Consistency Mode for δ-CRDTs (Algorithm 2)
2//!
3//! This module implements the delta-interval anti-entropy algorithm that provides
4//! **causal consistency** guarantees, extending Algorithm 1's convergence mode.
5//!
6//! # Algorithm 2: δ-CRDT Anti-Entropy with Causal Delivery
7//!
8//! Unlike Algorithm 1 which only guarantees eventual convergence, Algorithm 2
9//! ensures that deltas are applied in causal order. This prevents seeing effects
10//! before their causes.
11//!
12//! ## State Components
13//!
14//! Each replica i maintains:
15//! - **Durable state** `(Xᵢ, cᵢ)`:
16//!   - `Xᵢ`: The current CRDT state
17//!   - `cᵢ`: A durable counter (sequence number) that survives crashes
18//!
19//! - **Volatile state** `(Dᵢ, Aᵢ)`:
20//!   - `Dᵢ[j]`: Delta-interval buffer for each peer j (deltas to send)
21//!   - `Aᵢ[j]`: Acknowledgment map - last seq acked by peer j
22//!
23//! ## Protocol
24//!
25//! 1. **On local mutation m**:
26//!    ```text
27//!    cᵢ := cᵢ + 1
28//!    d := mδ(Xᵢ)
29//!    Xᵢ := Xᵢ ⊔ d
30//!    ∀j: Dᵢ[j] := Dᵢ[j] ⊔ d   // add delta to all peer buffers
31//!    ```
32//!
33//! 2. **On send to peer j** (periodic or on-demand):
34//!    ```text
35//!    if Dᵢ[j] ≠ ⊥ then
36//!        send ⟨Dᵢ[j], Aᵢ[j]+1, cᵢ⟩ to j   // delta-interval with seq range
37//!    ```
38//!
39//! 3. **On receive `⟨d, n, m⟩` from peer j**:
40//!    ```text
41//!    if n = Aᵢ[j] + 1 then        // causally ready
42//!        Xᵢ := Xᵢ ⊔ d
43//!        Aᵢ[j] := m
44//!        send ack(m) to j
45//!    else
46//!        discard (or buffer for later)
47//!    ```
48//!
49//! 4. **On receive ack(m) from peer j**:
50//!    ```text
51//!    Dᵢ[j] := ⊥                   // clear delta buffer for j
52//!    ```
53//!
54//! ## Garbage Collection
55//!
56//! Deltas can be safely garbage collected when ALL tracked peers have acknowledged them.
57//! This ensures no peer will ever need those deltas again.
58//!
59//! ## Crash Recovery
60//!
61//! On restart:
62//! - `Xᵢ` and `cᵢ` are restored from durable storage
63//! - `Dᵢ` and `Aᵢ` start fresh (volatile state lost)
64//! - Peers will detect the gap and request retransmission
65
66use crate::buffer::{ReplicaId, SeqNo};
67use mdcs_core::lattice::Lattice;
68use serde::{Deserialize, Serialize};
69use std::collections::{HashMap, VecDeque};
70
71/// A delta-interval message for causal delivery
72///
73/// Contains: `⟨delta, from_seq, to_seq⟩`
74/// - `delta`: The joined delta-group to apply
75/// - `from_seq`: Starting sequence number (exclusive)
76/// - `to_seq`: Ending sequence number (inclusive)
77///
78/// The receiver should only accept if `from_seq == last_acked_from_this_sender`
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
80pub struct DeltaInterval<D> {
81    /// The source replica that generated this interval
82    pub from: ReplicaId,
83    /// The destination replica
84    pub to: ReplicaId,
85    /// The joined delta-group
86    pub delta: D,
87    /// Sequence number just before this interval (exclusive lower bound)
88    pub from_seq: SeqNo,
89    /// Sequence number at the end of this interval (inclusive upper bound)
90    pub to_seq: SeqNo,
91}
92
93/// Acknowledgment for a delta-interval
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
95pub struct IntervalAck {
96    pub from: ReplicaId,
97    pub to: ReplicaId,
98    /// The sequence number being acknowledged
99    pub acked_seq: SeqNo,
100}
101
102/// Messages for the causal anti-entropy protocol
103#[derive(Debug, Clone)]
104pub enum CausalMessage<D> {
105    /// Delta-interval with causal ordering information
106    DeltaInterval(DeltaInterval<D>),
107    /// Acknowledgment of received interval
108    Ack(IntervalAck),
109    /// Request for state snapshot (for bootstrapping new replicas)
110    SnapshotRequest { from: ReplicaId, to: ReplicaId },
111    /// Full state snapshot response
112    Snapshot {
113        from: ReplicaId,
114        to: ReplicaId,
115        state: D,
116        seq: SeqNo,
117    },
118}
119
120/// Durable state that survives crashes
121///
122/// This must be persisted to stable storage before acknowledging
123/// any mutation or received delta.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DurableState<S> {
126    /// The replica's unique identifier
127    pub replica_id: ReplicaId,
128    /// The current CRDT state
129    pub state: S,
130    /// The durable counter (last generated sequence number)
131    pub counter: SeqNo,
132}
133
134impl<S: Lattice> DurableState<S> {
135    pub fn new(replica_id: impl Into<ReplicaId>) -> Self {
136        Self {
137            replica_id: replica_id.into(),
138            state: S::bottom(),
139            counter: 0,
140        }
141    }
142}
143
144/// Per-peer delta buffer for causal mode
145///
146/// Stores deltas that need to be sent to a specific peer,
147/// along with the sequence range they cover.
148#[derive(Debug, Clone)]
149pub struct PeerDeltaBuffer<D: Lattice> {
150    /// The accumulated delta to send
151    delta: Option<D>,
152    /// Sequence number before the first delta in buffer
153    from_seq: SeqNo,
154    /// Sequence number of the last delta in buffer
155    to_seq: SeqNo,
156}
157
158impl<D: Lattice> PeerDeltaBuffer<D> {
159    pub fn new() -> Self {
160        Self {
161            delta: None,
162            from_seq: 0,
163            to_seq: 0,
164        }
165    }
166
167    /// Start tracking from a specific sequence number
168    pub fn start_from(seq: SeqNo) -> Self {
169        Self {
170            delta: None,
171            from_seq: seq,
172            to_seq: seq,
173        }
174    }
175
176    /// Add a delta to this buffer
177    pub fn push(&mut self, delta: D, seq: SeqNo) {
178        match &mut self.delta {
179            Some(existing) => {
180                existing.join_assign(&delta);
181            }
182            None => {
183                self.delta = Some(delta);
184            }
185        }
186        self.to_seq = seq;
187    }
188
189    /// Check if buffer has pending deltas
190    pub fn has_pending(&self) -> bool {
191        self.delta.is_some()
192    }
193
194    /// Take the delta, clearing the buffer
195    pub fn take(&mut self) -> Option<(D, SeqNo, SeqNo)> {
196        self.delta.take().map(|d| {
197            let from = self.from_seq;
198            let to = self.to_seq;
199            self.from_seq = to;
200            (d, from, to)
201        })
202    }
203
204    /// Clear the buffer (on successful ack)
205    pub fn clear(&mut self) {
206        self.delta = None;
207        self.from_seq = self.to_seq;
208    }
209
210    /// Reset the buffer from a new sequence (after peer reconnect)
211    pub fn reset_from(&mut self, seq: SeqNo) {
212        self.delta = None;
213        self.from_seq = seq;
214        self.to_seq = seq;
215    }
216}
217
218impl<D: Lattice> Default for PeerDeltaBuffer<D> {
219    fn default() -> Self {
220        Self::new()
221    }
222}
223
224/// Volatile state for causal anti-entropy (lost on crash)
225#[derive(Debug, Clone)]
226pub struct VolatileState<D: Lattice> {
227    /// Per-peer delta buffers: Dᵢ\[j\]
228    pub delta_buffers: HashMap<ReplicaId, PeerDeltaBuffer<D>>,
229    /// Per-peer acknowledgment tracking: Aᵢ\[j\]
230    /// Stores the last sequence number we've received from each peer
231    pub peer_acks: HashMap<ReplicaId, SeqNo>,
232}
233
234impl<D: Lattice> VolatileState<D> {
235    pub fn new() -> Self {
236        Self {
237            delta_buffers: HashMap::new(),
238            peer_acks: HashMap::new(),
239        }
240    }
241
242    /// Register a peer
243    pub fn register_peer(&mut self, peer_id: ReplicaId) {
244        self.delta_buffers.entry(peer_id.clone()).or_default();
245        self.peer_acks.entry(peer_id).or_insert(0);
246    }
247
248    /// Get last acked sequence from a peer
249    pub fn get_peer_ack(&self, peer_id: &str) -> SeqNo {
250        self.peer_acks.get(peer_id).copied().unwrap_or(0)
251    }
252
253    /// Update the ack for a peer
254    pub fn update_peer_ack(&mut self, peer_id: &str, seq: SeqNo) {
255        if let Some(ack) = self.peer_acks.get_mut(peer_id) {
256            *ack = (*ack).max(seq);
257        }
258    }
259}
260
261impl<D: Lattice> Default for VolatileState<D> {
262    fn default() -> Self {
263        Self::new()
264    }
265}
266
267/// A causal δ-CRDT replica implementing Algorithm 2
268///
269/// Provides causal consistency guarantees by:
270/// 1. Tracking per-peer delta intervals
271/// 2. Only accepting deltas in causal order
272/// 3. Supporting crash recovery via durable state
273#[derive(Debug, Clone)]
274pub struct CausalReplica<S: Lattice + Clone> {
275    /// Durable state (survives crashes)
276    durable: DurableState<S>,
277    /// Volatile state (lost on crash)
278    volatile: VolatileState<S>,
279    /// Pending deltas waiting for causal predecessors
280    pending: HashMap<ReplicaId, VecDeque<DeltaInterval<S>>>,
281}
282
283impl<S: Lattice + Clone> CausalReplica<S> {
284    /// Create a new causal replica
285    pub fn new(id: impl Into<ReplicaId>) -> Self {
286        Self {
287            durable: DurableState::new(id),
288            volatile: VolatileState::new(),
289            pending: HashMap::new(),
290        }
291    }
292
293    /// Restore from durable state (after crash)
294    pub fn restore(durable: DurableState<S>) -> Self {
295        Self {
296            durable,
297            volatile: VolatileState::new(),
298            pending: HashMap::new(),
299        }
300    }
301
302    /// Get the replica ID
303    pub fn id(&self) -> &ReplicaId {
304        &self.durable.replica_id
305    }
306
307    /// Get current state (read-only)
308    pub fn state(&self) -> &S {
309        &self.durable.state
310    }
311
312    /// Get the durable counter (sequence number)
313    pub fn counter(&self) -> SeqNo {
314        self.durable.counter
315    }
316
317    /// Get durable state for persistence
318    pub fn durable_state(&self) -> &DurableState<S> {
319        &self.durable
320    }
321
322    /// Register a peer for causal anti-entropy
323    pub fn register_peer(&mut self, peer_id: ReplicaId) {
324        self.volatile.register_peer(peer_id.clone());
325        self.pending.entry(peer_id).or_default();
326    }
327
328    /// Apply a local mutation
329    ///
330    /// Algorithm 2, step 1:
331    /// ```text
332    /// cᵢ := cᵢ + 1
333    /// d := mδ(Xᵢ)
334    /// Xᵢ := Xᵢ ⊔ d
335    /// ∀j: Dᵢ[j] := Dᵢ[j] ⊔ d
336    /// ```
337    ///
338    /// Returns the computed delta
339    pub fn mutate<F>(&mut self, mutator: F) -> S
340    where
341        F: FnOnce(&S) -> S,
342    {
343        // Increment durable counter
344        self.durable.counter += 1;
345        let seq = self.durable.counter;
346
347        // Compute delta: d = mδ(X)
348        let delta = mutator(&self.durable.state);
349
350        // Apply to state: X = X ⊔ d
351        self.durable.state.join_assign(&delta);
352
353        // Add to all peer buffers: ∀j: Dᵢ[j] := Dᵢ[j] ⊔ d
354        for buffer in self.volatile.delta_buffers.values_mut() {
355            buffer.push(delta.clone(), seq);
356        }
357
358        delta
359    }
360
361    /// Prepare a delta-interval to send to a peer
362    ///
363    /// Returns `Some(DeltaInterval)` if there are pending deltas for this peer,
364    /// or `None` if the buffer is empty.
365    pub fn prepare_interval(&mut self, peer_id: &str) -> Option<DeltaInterval<S>> {
366        let buffer = self.volatile.delta_buffers.get_mut(peer_id)?;
367
368        buffer
369            .take()
370            .map(|(delta, from_seq, to_seq)| DeltaInterval {
371                from: self.durable.replica_id.clone(),
372                to: peer_id.to_string(),
373                delta,
374                from_seq,
375                to_seq,
376            })
377    }
378
379    /// Check if a delta-interval is causally ready
380    ///
381    /// A delta-interval is ready if its from_seq matches our last acked seq from that peer
382    fn is_causally_ready(&self, interval: &DeltaInterval<S>) -> bool {
383        let last_acked = self.volatile.get_peer_ack(&interval.from);
384        interval.from_seq == last_acked
385    }
386
387    /// Receive a delta-interval from a peer
388    ///
389    /// Algorithm 2, step 3:
390    /// ```text
391    /// if n = Aᵢ[j] + 1 then        // causally ready
392    ///     Xᵢ := Xᵢ ⊔ d
393    ///     Aᵢ[j] := m
394    ///     send ack(m) to j
395    /// else
396    ///     buffer for later
397    /// ```
398    ///
399    /// Returns `Some(IntervalAck)` if the interval was applied (causally ready),
400    /// or `None` if it was buffered for later.
401    pub fn receive_interval(&mut self, interval: DeltaInterval<S>) -> Option<IntervalAck> {
402        // Register the peer if not known
403        if !self.volatile.peer_acks.contains_key(&interval.from) {
404            self.register_peer(interval.from.clone());
405        }
406
407        if self.is_causally_ready(&interval) {
408            // Apply the delta
409            self.durable.state.join_assign(&interval.delta);
410
411            // Update our ack for this peer
412            self.volatile
413                .update_peer_ack(&interval.from, interval.to_seq);
414
415            let ack = IntervalAck {
416                from: self.durable.replica_id.clone(),
417                to: interval.from.clone(),
418                acked_seq: interval.to_seq,
419            };
420
421            // Try to apply any pending intervals that are now ready
422            self.try_apply_pending(&interval.from);
423
424            Some(ack)
425        } else {
426            // Buffer for later
427            let pending = self.pending.entry(interval.from.clone()).or_default();
428
429            // Insert in sorted order by from_seq
430            let pos = pending.iter().position(|p| p.from_seq > interval.from_seq);
431            match pos {
432                Some(i) => pending.insert(i, interval),
433                None => pending.push_back(interval),
434            }
435
436            None
437        }
438    }
439
440    /// Try to apply pending intervals that are now causally ready
441    fn try_apply_pending(&mut self, peer_id: &str) -> Vec<IntervalAck> {
442        let mut acks = Vec::new();
443
444        if let Some(pending) = self.pending.get_mut(peer_id) {
445            while let Some(interval) = pending.front() {
446                let last_acked = self.volatile.get_peer_ack(peer_id);
447                if interval.from_seq == last_acked {
448                    let interval = pending.pop_front().unwrap();
449
450                    // Apply the delta
451                    self.durable.state.join_assign(&interval.delta);
452
453                    // Update our ack
454                    self.volatile.update_peer_ack(peer_id, interval.to_seq);
455
456                    acks.push(IntervalAck {
457                        from: self.durable.replica_id.clone(),
458                        to: interval.from.clone(),
459                        acked_seq: interval.to_seq,
460                    });
461                } else {
462                    break;
463                }
464            }
465        }
466
467        acks
468    }
469
470    /// Process an acknowledgment from a peer
471    ///
472    /// Algorithm 2, step 4:
473    /// ```text
474    /// Dᵢ[j] := ⊥   // clear delta buffer for j
475    /// ```
476    pub fn receive_ack(&mut self, ack: &IntervalAck) {
477        if let Some(buffer) = self.volatile.delta_buffers.get_mut(&ack.from) {
478            buffer.clear();
479        }
480    }
481
482    /// Get a full state snapshot for bootstrapping
483    pub fn snapshot(&self) -> (S, SeqNo) {
484        (self.durable.state.clone(), self.durable.counter)
485    }
486
487    /// Apply a snapshot from another replica (for bootstrapping)
488    pub fn apply_snapshot(&mut self, state: S, seq: SeqNo, from: &str) {
489        self.durable.state.join_assign(&state);
490        self.volatile.update_peer_ack(from, seq);
491    }
492
493    /// Get all registered peer IDs
494    pub fn peers(&self) -> impl Iterator<Item = &ReplicaId> {
495        self.volatile.peer_acks.keys()
496    }
497
498    /// Check if we have pending deltas for any peer
499    pub fn has_pending_deltas(&self) -> bool {
500        self.volatile
501            .delta_buffers
502            .values()
503            .any(|b| b.has_pending())
504    }
505
506    /// Count of pending out-of-order intervals
507    pub fn pending_count(&self) -> usize {
508        self.pending.values().map(|v| v.len()).sum()
509    }
510}
511
512/// Trait for durable storage backends
513///
514/// Implement this trait to persist `DurableState` across crashes.
515pub trait DurableStorage<S: Lattice> {
516    /// Persist the durable state
517    fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError>;
518
519    /// Load the durable state
520    fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError>;
521
522    /// Force sync to stable storage
523    fn sync(&mut self) -> Result<(), StorageError>;
524}
525
526/// Storage errors
527#[derive(Debug, Clone)]
528pub enum StorageError {
529    IoError(String),
530    SerializationError(String),
531    NotFound,
532}
533
534impl std::fmt::Display for StorageError {
535    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536        match self {
537            StorageError::IoError(msg) => write!(f, "IO error: {}", msg),
538            StorageError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
539            StorageError::NotFound => write!(f, "State not found"),
540        }
541    }
542}
543
544impl std::error::Error for StorageError {}
545
546/// In-memory storage for testing (simulates durable storage)
547#[derive(Debug, Default)]
548pub struct MemoryStorage<S> {
549    states: HashMap<ReplicaId, DurableState<S>>,
550}
551
552impl<S: Clone> MemoryStorage<S> {
553    pub fn new() -> Self {
554        Self {
555            states: HashMap::new(),
556        }
557    }
558}
559
560impl<S: Lattice + Clone + Serialize + for<'de> Deserialize<'de>> DurableStorage<S>
561    for MemoryStorage<S>
562{
563    fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError> {
564        self.states.insert(state.replica_id.clone(), state.clone());
565        Ok(())
566    }
567
568    fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError> {
569        Ok(self.states.get(replica_id).cloned())
570    }
571
572    fn sync(&mut self) -> Result<(), StorageError> {
573        Ok(())
574    }
575}
576
577/// Network simulator for causal anti-entropy
578#[derive(Debug)]
579pub struct CausalNetworkSimulator<D> {
580    /// Messages in flight
581    in_flight: VecDeque<CausalMessage<D>>,
582    /// Messages that were "lost"
583    lost: Vec<CausalMessage<D>>,
584    /// Loss rate (0.0 - 1.0)
585    loss_rate: f64,
586    /// Random state
587    rng_state: u64,
588}
589
590impl<D: Clone> CausalNetworkSimulator<D> {
591    pub fn new(loss_rate: f64) -> Self {
592        Self {
593            in_flight: VecDeque::new(),
594            lost: Vec::new(),
595            loss_rate,
596            rng_state: 42,
597        }
598    }
599
600    /// Simple random number generator
601    fn next_random(&mut self) -> f64 {
602        self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
603        ((self.rng_state >> 16) & 0x7fff) as f64 / 32768.0
604    }
605
606    /// Send a message
607    pub fn send(&mut self, msg: CausalMessage<D>) {
608        if self.next_random() < self.loss_rate {
609            self.lost.push(msg);
610        } else {
611            self.in_flight.push_back(msg);
612        }
613    }
614
615    /// Receive the next message
616    pub fn receive(&mut self) -> Option<CausalMessage<D>> {
617        self.in_flight.pop_front()
618    }
619
620    /// Retransmit lost messages
621    pub fn retransmit_lost(&mut self) {
622        for msg in self.lost.drain(..) {
623            self.in_flight.push_back(msg);
624        }
625    }
626
627    /// Check if empty
628    pub fn is_empty(&self) -> bool {
629        self.in_flight.is_empty()
630    }
631
632    /// Messages in flight
633    pub fn in_flight_count(&self) -> usize {
634        self.in_flight.len()
635    }
636
637    /// Lost messages
638    pub fn lost_count(&self) -> usize {
639        self.lost.len()
640    }
641}
642
643/// Cluster coordinator for causal anti-entropy
644#[derive(Debug)]
645pub struct CausalCluster<S: Lattice + Clone> {
646    /// All replicas
647    replicas: Vec<CausalReplica<S>>,
648    /// Network simulator
649    network: CausalNetworkSimulator<S>,
650}
651
652impl<S: Lattice + Clone> CausalCluster<S> {
653    /// Create a new cluster with n replicas
654    pub fn new(n: usize, loss_rate: f64) -> Self {
655        let mut replicas = Vec::with_capacity(n);
656
657        // Create replicas
658        for i in 0..n {
659            let mut replica = CausalReplica::new(format!("causal_{}", i));
660            // Register all other peers
661            for j in 0..n {
662                if i != j {
663                    replica.register_peer(format!("causal_{}", j));
664                }
665            }
666            replicas.push(replica);
667        }
668
669        Self {
670            replicas,
671            network: CausalNetworkSimulator::new(loss_rate),
672        }
673    }
674
675    /// Get replica by index
676    pub fn replica(&self, idx: usize) -> &CausalReplica<S> {
677        &self.replicas[idx]
678    }
679
680    /// Get mutable replica
681    pub fn replica_mut(&mut self, idx: usize) -> &mut CausalReplica<S> {
682        &mut self.replicas[idx]
683    }
684
685    /// Perform a mutation
686    pub fn mutate<F>(&mut self, replica_idx: usize, mutator: F) -> S
687    where
688        F: FnOnce(&S) -> S,
689    {
690        self.replicas[replica_idx].mutate(mutator)
691    }
692
693    /// Initiate sync from one replica to all its peers
694    pub fn broadcast_intervals(&mut self, from_idx: usize) {
695        let replica = &mut self.replicas[from_idx];
696        let peer_ids: Vec<_> = replica.peers().cloned().collect();
697
698        for peer_id in peer_ids {
699            if let Some(interval) = replica.prepare_interval(&peer_id) {
700                self.network.send(CausalMessage::DeltaInterval(interval));
701            }
702        }
703    }
704
705    /// Process one network message
706    pub fn process_one(&mut self) -> bool {
707        if let Some(msg) = self.network.receive() {
708            match msg {
709                CausalMessage::DeltaInterval(interval) => {
710                    // Find recipient
711                    for replica in &mut self.replicas {
712                        if replica.id() == &interval.to {
713                            if let Some(ack) = replica.receive_interval(interval.clone()) {
714                                self.network.send(CausalMessage::Ack(ack));
715                            }
716                            break;
717                        }
718                    }
719                }
720                CausalMessage::Ack(ack) => {
721                    // Find recipient
722                    for replica in &mut self.replicas {
723                        if replica.id() == &ack.to {
724                            replica.receive_ack(&ack);
725                            break;
726                        }
727                    }
728                }
729                CausalMessage::SnapshotRequest { from, to } => {
730                    // Find source and send snapshot
731                    for replica in &self.replicas {
732                        if replica.id() == &to {
733                            let (state, seq) = replica.snapshot();
734                            self.network.send(CausalMessage::Snapshot {
735                                from: to,
736                                to: from,
737                                state,
738                                seq,
739                            });
740                            break;
741                        }
742                    }
743                }
744                CausalMessage::Snapshot {
745                    from,
746                    to,
747                    state,
748                    seq,
749                } => {
750                    // Find recipient and apply
751                    for replica in &mut self.replicas {
752                        if replica.id() == &to {
753                            replica.apply_snapshot(state, seq, &from);
754                            break;
755                        }
756                    }
757                }
758            }
759            true
760        } else {
761            false
762        }
763    }
764
765    /// Drain all messages
766    pub fn drain_network(&mut self) {
767        while self.process_one() {}
768    }
769
770    /// Full sync round
771    pub fn full_sync_round(&mut self) {
772        let n = self.replicas.len();
773        for i in 0..n {
774            self.broadcast_intervals(i);
775        }
776        self.drain_network();
777    }
778
779    /// Check if converged
780    pub fn is_converged(&self) -> bool {
781        if self.replicas.len() < 2 {
782            return true;
783        }
784
785        let first = self.replicas[0].state();
786        self.replicas.iter().skip(1).all(|r| r.state() == first)
787    }
788
789    /// Retransmit and process
790    pub fn retransmit_and_process(&mut self) {
791        self.network.retransmit_lost();
792        self.drain_network();
793    }
794
795    /// Number of replicas
796    pub fn len(&self) -> usize {
797        self.replicas.len()
798    }
799
800    /// Check if empty
801    pub fn is_empty(&self) -> bool {
802        self.replicas.is_empty()
803    }
804
805    /// Simulate a crash and recovery for a replica
806    pub fn crash_and_recover(&mut self, idx: usize) {
807        let durable = self.replicas[idx].durable_state().clone();
808
809        // Restore from durable state (volatile state is lost)
810        let mut recovered = CausalReplica::restore(durable);
811
812        // Re-register peers
813        let n = self.replicas.len();
814        for j in 0..n {
815            if idx != j {
816                recovered.register_peer(format!("causal_{}", j));
817            }
818        }
819
820        self.replicas[idx] = recovered;
821    }
822
823    /// Get total pending out-of-order intervals across all replicas
824    pub fn total_pending(&self) -> usize {
825        self.replicas.iter().map(|r| r.pending_count()).sum()
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use super::*;
832    use mdcs_core::gset::GSet;
833    use mdcs_core::pncounter::PNCounter;
834
835    #[test]
836    fn test_causal_replica_basic() {
837        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
838
839        replica.mutate(|_| {
840            let mut d = GSet::new();
841            d.insert(42);
842            d
843        });
844
845        assert!(replica.state().contains(&42));
846        assert_eq!(replica.counter(), 1);
847    }
848
849    #[test]
850    fn test_causal_interval_generation() {
851        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
852        replica.register_peer("peer1".to_string());
853
854        replica.mutate(|_| {
855            let mut d = GSet::new();
856            d.insert(1);
857            d
858        });
859
860        replica.mutate(|_| {
861            let mut d = GSet::new();
862            d.insert(2);
863            d
864        });
865
866        let interval = replica.prepare_interval("peer1").unwrap();
867        assert_eq!(interval.from_seq, 0);
868        assert_eq!(interval.to_seq, 2);
869        assert!(interval.delta.contains(&1));
870        assert!(interval.delta.contains(&2));
871    }
872
873    #[test]
874    fn test_causal_delivery() {
875        let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
876        let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
877
878        r1.register_peer("r2".to_string());
879        r2.register_peer("r1".to_string());
880
881        // r1 creates two mutations
882        r1.mutate(|_| {
883            let mut d = GSet::new();
884            d.insert(1);
885            d
886        });
887        r1.mutate(|_| {
888            let mut d = GSet::new();
889            d.insert(2);
890            d
891        });
892
893        // Get interval
894        let interval = r1.prepare_interval("r2").unwrap();
895        assert_eq!(interval.from_seq, 0);
896        assert_eq!(interval.to_seq, 2);
897
898        // r2 receives it
899        let ack = r2.receive_interval(interval).unwrap();
900        assert_eq!(ack.acked_seq, 2);
901
902        // r2 now has both elements
903        assert!(r2.state().contains(&1));
904        assert!(r2.state().contains(&2));
905    }
906
907    #[test]
908    fn test_out_of_order_buffering() {
909        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
910        replica.register_peer("peer".to_string());
911
912        // Create an interval that's NOT causally ready (from_seq = 5, but we've acked 0)
913        let out_of_order = DeltaInterval {
914            from: "peer".to_string(),
915            to: "r1".to_string(),
916            delta: {
917                let mut d = GSet::new();
918                d.insert(999);
919                d
920            },
921            from_seq: 5, // Not ready - we haven't seen 1-5
922            to_seq: 6,
923        };
924
925        // Should be buffered, not applied
926        let result = replica.receive_interval(out_of_order);
927        assert!(result.is_none());
928        assert_eq!(replica.pending_count(), 1);
929        assert!(!replica.state().contains(&999));
930    }
931
932    #[test]
933    fn test_cluster_convergence() {
934        let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.0);
935
936        // Each replica adds different element
937        for i in 0..3 {
938            let val = (i + 1) as i32;
939            cluster.mutate(i, move |_| {
940                let mut d = GSet::new();
941                d.insert(val);
942                d
943            });
944        }
945
946        // Not converged yet
947        assert!(!cluster.is_converged());
948
949        // Sync
950        cluster.full_sync_round();
951
952        // Should converge
953        assert!(cluster.is_converged());
954
955        // All replicas should have all elements
956        for i in 0..3 {
957            for val in 1..=3 {
958                assert!(cluster.replica(i).state().contains(&val));
959            }
960        }
961    }
962
963    #[test]
964    fn test_cluster_with_loss() {
965        let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.3);
966
967        for i in 0..3 {
968            let val = (i + 1) as i32;
969            cluster.mutate(i, move |_| {
970                let mut d = GSet::new();
971                d.insert(val);
972                d
973            });
974        }
975
976        // Multiple rounds with retransmission
977        for _ in 0..10 {
978            cluster.full_sync_round();
979            cluster.retransmit_and_process();
980        }
981
982        // Should eventually converge
983        assert!(cluster.is_converged());
984    }
985
986    #[test]
987    fn test_crash_recovery() {
988        let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(2, 0.0);
989
990        // r0 adds element
991        cluster.mutate(0, |_| {
992            let mut d = GSet::new();
993            d.insert(1);
994            d
995        });
996
997        // Sync
998        cluster.full_sync_round();
999        assert!(cluster.is_converged());
1000
1001        // r0 adds another element
1002        cluster.mutate(0, |_| {
1003            let mut d = GSet::new();
1004            d.insert(2);
1005            d
1006        });
1007
1008        // r0 crashes before syncing
1009        let counter_before = cluster.replica(0).counter();
1010        cluster.crash_and_recover(0);
1011
1012        // Durable state should be preserved
1013        assert_eq!(cluster.replica(0).counter(), counter_before);
1014        assert!(cluster.replica(0).state().contains(&1));
1015        assert!(cluster.replica(0).state().contains(&2));
1016
1017        // But volatile state (delta buffers) is lost
1018        // r0 needs to re-sync
1019        assert!(!cluster.replica(0).has_pending_deltas());
1020    }
1021
1022    #[test]
1023    fn test_pncounter_causal() {
1024        let mut cluster: CausalCluster<PNCounter<String>> = CausalCluster::new(2, 0.0);
1025
1026        // r0 increments
1027        cluster.mutate(0, |_s| {
1028            let mut delta = PNCounter::new();
1029            delta.increment("r0".to_string(), 1);
1030            delta
1031        });
1032
1033        // r1 decrements
1034        cluster.mutate(1, |_s| {
1035            let mut delta = PNCounter::new();
1036            delta.decrement("r1".to_string(), 1);
1037            delta
1038        });
1039
1040        // Sync
1041        cluster.full_sync_round();
1042
1043        // Both should have value 0 (1 - 1)
1044        assert!(cluster.is_converged());
1045        assert_eq!(cluster.replica(0).state().value(), 0);
1046    }
1047
1048    #[test]
1049    fn test_causal_ordering_preserved() {
1050        // This test verifies that causal ordering is respected
1051        let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
1052        let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
1053
1054        r1.register_peer("r2".to_string());
1055        r2.register_peer("r1".to_string());
1056
1057        // r1 creates three sequential mutations
1058        for i in 1..=3 {
1059            r1.mutate(move |_| {
1060                let mut d = GSet::new();
1061                d.insert(i);
1062                d
1063            });
1064        }
1065
1066        // Create intervals for each mutation
1067        // Simulate them arriving out of order by creating separate intervals
1068
1069        // We need to manually create intervals to test out-of-order delivery
1070        let interval_1_3 = DeltaInterval {
1071            from: "r1".to_string(),
1072            to: "r2".to_string(),
1073            delta: {
1074                let mut d = GSet::new();
1075                d.insert(3);
1076                d
1077            },
1078            from_seq: 2, // This requires seq 1-2 to be acked first
1079            to_seq: 3,
1080        };
1081
1082        let interval_0_2 = DeltaInterval {
1083            from: "r1".to_string(),
1084            to: "r2".to_string(),
1085            delta: {
1086                let mut d = GSet::new();
1087                d.insert(1);
1088                d.insert(2);
1089                d
1090            },
1091            from_seq: 0,
1092            to_seq: 2,
1093        };
1094
1095        // Send interval 2-3 first (out of order)
1096        let result = r2.receive_interval(interval_1_3.clone());
1097        assert!(result.is_none()); // Should be buffered
1098        assert!(!r2.state().contains(&3)); // Not yet applied
1099
1100        // Now send interval 0-2
1101        let result = r2.receive_interval(interval_0_2);
1102        assert!(result.is_some()); // Should be applied
1103        assert!(r2.state().contains(&1));
1104        assert!(r2.state().contains(&2));
1105
1106        // And the pending interval should now be applied too!
1107        assert!(r2.state().contains(&3));
1108        assert_eq!(r2.pending_count(), 0);
1109    }
1110
1111    #[test]
1112    fn test_durable_storage() {
1113        let mut storage: MemoryStorage<GSet<i32>> = MemoryStorage::new();
1114
1115        let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test");
1116        replica.mutate(|_| {
1117            let mut d = GSet::new();
1118            d.insert(42);
1119            d
1120        });
1121
1122        // Persist
1123        storage.persist(replica.durable_state()).unwrap();
1124
1125        // Load
1126        let loaded = storage.load("test").unwrap().unwrap();
1127        assert_eq!(loaded.counter, 1);
1128        assert!(loaded.state.contains(&42));
1129
1130        // Restore
1131        let recovered = CausalReplica::restore(loaded);
1132        assert!(recovered.state().contains(&42));
1133    }
1134}