mdcs_delta/
mutators.rs

1//! Delta-mutators for CRDT types
2//!
3//! For each CRDT type, we implement delta-mutators `mδ` such that:
4//!   m(X) = X ⊔ mδ(X)
5//!
6//! This means the full mutation can be reconstructed by joining the delta
7//! with the original state.
8
9use mdcs_core::gset::GSet;
10use mdcs_core::lattice::Lattice;
11use mdcs_core::orset::{ORSet, ORSetDelta, Tag};
12use std::collections::{BTreeMap, BTreeSet};
13
14/// Delta-mutator trait for CRDTs
15///
16/// A delta-mutator produces a small delta that, when joined with the state,
17/// produces the same result as the full mutation.
18pub trait DeltaMutator<S: Lattice>: Lattice {
19    /// Apply this delta to a state
20    fn apply_to(&self, state: &S) -> S;
21}
22
23// ============================================================================
24// GSet Delta Mutators
25// ============================================================================
26
27/// Delta for GSet insert operation
28///
29/// For GSet, the delta is simply a singleton set containing the inserted element.
30/// Property: X.insert(v) = X ⊔ {v}
31#[derive(Clone, Debug, PartialEq, Eq)]
32pub struct GSetInsertDelta<T: Ord + Clone> {
33    element: T,
34}
35
36impl<T: Ord + Clone> GSetInsertDelta<T> {
37    /// Create a delta for inserting an element
38    pub fn new(element: T) -> Self {
39        Self { element }
40    }
41
42    /// Get the element being inserted
43    pub fn element(&self) -> &T {
44        &self.element
45    }
46}
47
48/// Create a GSet containing just this element (for use as delta)
49impl<T: Ord + Clone> From<GSetInsertDelta<T>> for GSet<T> {
50    fn from(delta: GSetInsertDelta<T>) -> Self {
51        let mut set = GSet::new();
52        set.insert(delta.element);
53        set
54    }
55}
56
57/// GSet delta-mutators
58pub mod gset {
59    use super::*;
60
61    /// Delta-mutator for insert: mδ_insert(X, v) = {v}
62    /// Property: X.insert(v) = X ⊔ mδ_insert(X, v)
63    pub fn insert_delta<T: Ord + Clone>(value: T) -> GSet<T> {
64        let mut delta = GSet::new();
65        delta.insert(value);
66        delta
67    }
68
69    /// Batch insert delta-mutator
70    pub fn insert_batch_delta<T: Ord + Clone>(values: impl IntoIterator<Item = T>) -> GSet<T> {
71        let mut delta = GSet::new();
72        for value in values {
73            delta.insert(value);
74        }
75        delta
76    }
77
78    /// Apply insert delta to a GSet
79    pub fn apply_insert<T: Ord + Clone>(state: &mut GSet<T>, value: T) -> GSet<T> {
80        let delta = insert_delta(value);
81        state.join_assign(&delta);
82        delta
83    }
84}
85
86// ============================================================================
87// ORSet Delta Mutators
88// ============================================================================
89
90/// ORSet delta-mutators
91pub mod orset {
92    use super::*;
93
94    /// Delta-mutator for add: generates a new unique tag and returns delta
95    /// Property: X.add(v) = X ⊔ mδ_add(X, v)
96    pub fn add_delta<T: Ord + Clone>(replica_id: &str, value: T) -> ORSetDelta<T> {
97        let tag = Tag::new(replica_id);
98        let mut additions = BTreeMap::new();
99        let mut tags = BTreeSet::new();
100        tags.insert(tag);
101        additions.insert(value, tags);
102
103        ORSetDelta {
104            additions,
105            removals: BTreeSet::new(),
106        }
107    }
108
109    /// Delta-mutator for remove: collects tags to tombstone
110    /// Property: X.remove(v) = X ⊔ mδ_remove(X, v)
111    pub fn remove_delta<T: Ord + Clone>(state: &ORSet<T>, value: &T) -> ORSetDelta<T> {
112        // Get all tags for this value from the current state
113        // The remove delta contains these tags as tombstones
114        let removals = if state.contains(value) {
115            // We need to access the internal tags - this requires ORSet to expose them
116            // For now, we create an empty removal (the actual implementation uses pending_delta)
117            BTreeSet::new()
118        } else {
119            BTreeSet::new()
120        };
121
122        ORSetDelta {
123            additions: BTreeMap::new(),
124            removals,
125        }
126    }
127
128    /// Apply add operation using delta-mutator
129    pub fn apply_add<T: Ord + Clone>(
130        state: &mut ORSet<T>,
131        replica_id: &str,
132        value: T,
133    ) -> ORSetDelta<T> {
134        // Use the built-in add which already maintains pending_delta
135        state.add(replica_id, value.clone());
136        add_delta(replica_id, value)
137    }
138}
139
140// ============================================================================
141// LWWRegister Delta Mutators
142// ============================================================================
143
144pub mod lwwreg {
145    use super::*;
146    use mdcs_core::lwwreg::LWWRegister;
147    use serde::{Deserialize, Serialize};
148
149    /// Delta for LWW Register write operation
150    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
151    pub struct LWWWriteDelta<T: Ord + Clone, K: Ord + Clone> {
152        pub timestamp: u64,
153        pub replica_id: K,
154        pub value: T,
155    }
156
157    impl<T: Ord + Clone, K: Ord + Clone> Lattice for LWWWriteDelta<T, K> {
158        fn bottom() -> Self {
159            panic!("LWWWriteDelta has no bottom element");
160        }
161
162        fn join(&self, other: &Self) -> Self {
163            // Keep the value with higher timestamp (tie-break on replica_id)
164            if other.timestamp > self.timestamp
165                || (other.timestamp == self.timestamp && other.replica_id > self.replica_id)
166            {
167                other.clone()
168            } else {
169                self.clone()
170            }
171        }
172    }
173
174    /// Delta-mutator for set operation
175    /// Property: X.set(v) = X ⊔ mδ_set(X, v, ts, rid)
176    pub fn set_delta<T: Ord + Clone, K: Ord + Clone>(
177        value: T,
178        timestamp: u64,
179        replica_id: K,
180    ) -> LWWWriteDelta<T, K> {
181        LWWWriteDelta {
182            timestamp,
183            replica_id,
184            value,
185        }
186    }
187
188    /// Convert delta to a LWW Register state
189    pub fn apply_set<T: Ord + Clone, K: Ord + Clone + Default>(
190        state: &mut LWWRegister<T, K>,
191        value: T,
192        timestamp: u64,
193        replica_id: K,
194    ) {
195        state.set(value, timestamp, replica_id);
196    }
197}
198
199// ============================================================================
200// PNCounter Delta Mutators
201// ============================================================================
202
203pub mod pncounter {
204    use super::*;
205    use mdcs_core::pncounter::PNCounter;
206    use serde::{Deserialize, Serialize};
207
208    /// Delta for increment operation
209    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
210    pub struct IncrementDelta<K: Ord + Clone> {
211        pub replica_id: K,
212        pub amount: u64,
213    }
214
215    /// Delta for decrement operation
216    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
217    pub struct DecrementDelta<K: Ord + Clone> {
218        pub replica_id: K,
219        pub amount: u64,
220    }
221
222    impl<K: Ord + Clone> Lattice for IncrementDelta<K> {
223        fn bottom() -> Self {
224            panic!("IncrementDelta has no bottom element");
225        }
226
227        fn join(&self, other: &Self) -> Self {
228            // For same replica, take max; otherwise union both
229            if self.replica_id == other.replica_id {
230                Self {
231                    replica_id: self.replica_id.clone(),
232                    amount: self.amount.max(other.amount),
233                }
234            } else {
235                self.clone() // Semantically different replicas, but we can't represent union
236            }
237        }
238    }
239
240    impl<K: Ord + Clone> Lattice for DecrementDelta<K> {
241        fn bottom() -> Self {
242            panic!("DecrementDelta has no bottom element");
243        }
244
245        fn join(&self, other: &Self) -> Self {
246            if self.replica_id == other.replica_id {
247                Self {
248                    replica_id: self.replica_id.clone(),
249                    amount: self.amount.max(other.amount),
250                }
251            } else {
252                self.clone()
253            }
254        }
255    }
256
257    /// Delta-mutator for increment operation
258    pub fn increment_delta<K: Ord + Clone>(replica_id: K, amount: u64) -> IncrementDelta<K> {
259        IncrementDelta { replica_id, amount }
260    }
261
262    /// Delta-mutator for decrement operation
263    pub fn decrement_delta<K: Ord + Clone>(replica_id: K, amount: u64) -> DecrementDelta<K> {
264        DecrementDelta { replica_id, amount }
265    }
266
267    /// Apply increment delta to counter
268    pub fn apply_increment<K: Ord + Clone>(state: &mut PNCounter<K>, replica_id: K, amount: u64) {
269        state.increment(replica_id, amount);
270    }
271
272    /// Apply decrement delta to counter
273    pub fn apply_decrement<K: Ord + Clone>(state: &mut PNCounter<K>, replica_id: K, amount: u64) {
274        state.decrement(replica_id, amount);
275    }
276}
277
278// ============================================================================
279// MVRegister Delta Mutators
280// ============================================================================
281
282pub mod mvreg {
283    use super::*;
284    use mdcs_core::mvreg::{Dot, MVRegister};
285    use serde::{Deserialize, Serialize};
286
287    /// Delta for write operation on Multi-Value Register
288    #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
289    pub struct WriteDelta<T: Ord + Clone> {
290        pub dot: Dot,
291        pub value: T,
292    }
293
294    impl<T: Ord + Clone> Lattice for WriteDelta<T> {
295        fn bottom() -> Self {
296            panic!("WriteDelta has no bottom element");
297        }
298
299        fn join(&self, _other: &Self) -> Self {
300            // Union: keep both values (they're different dots)
301            // This is handled by MVRegister's join semantics
302            self.clone()
303        }
304    }
305
306    /// Delta-mutator for write operation
307    pub fn write_delta<T: Ord + Clone>(dot: Dot, value: T) -> WriteDelta<T> {
308        WriteDelta { dot, value }
309    }
310
311    /// Apply write delta to MVRegister
312    pub fn apply_write<T: Ord + Clone>(
313        state: &mut MVRegister<T>,
314        replica_id: &str,
315        value: T,
316    ) -> Dot {
317        state.write(replica_id, value)
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use mdcs_core::lattice::DeltaCRDT;
325    use mdcs_core::lwwreg::LWWRegister;
326    use mdcs_core::mvreg::MVRegister;
327    use mdcs_core::pncounter::PNCounter;
328
329    #[test]
330    fn test_gset_insert_delta() {
331        let mut state: GSet<i32> = GSet::new();
332        state.insert(1);
333        state.insert(2);
334
335        // Create delta for inserting 3
336        let delta = gset::insert_delta(3);
337
338        // Apply delta
339        let result = state.join(&delta);
340
341        assert!(result.contains(&1));
342        assert!(result.contains(&2));
343        assert!(result.contains(&3));
344    }
345
346    #[test]
347    fn test_gset_delta_property() {
348        // Property: m(X) = X ⊔ mδ(X)
349        let mut state: GSet<i32> = GSet::new();
350        state.insert(1);
351
352        // Method 1: Direct mutation
353        let mut direct = state.clone();
354        direct.insert(42);
355
356        // Method 2: Via delta-mutator
357        let delta = gset::insert_delta(42);
358        let via_delta = state.join(&delta);
359
360        assert_eq!(direct, via_delta);
361    }
362
363    #[test]
364    fn test_gset_batch_delta() {
365        let state: GSet<i32> = GSet::new();
366
367        let delta = gset::insert_batch_delta(vec![1, 2, 3, 4, 5]);
368        let result = state.join(&delta);
369
370        for i in 1..=5 {
371            assert!(result.contains(&i));
372        }
373    }
374
375    #[test]
376    fn test_gset_delta_idempotence() {
377        let mut state: GSet<i32> = GSet::new();
378        state.insert(1);
379
380        let delta = gset::insert_delta(2);
381
382        // Apply delta multiple times
383        let once = state.join(&delta);
384        let twice = once.join(&delta);
385        let thrice = twice.join(&delta);
386
387        // Idempotence: applying same delta multiple times has no effect
388        assert_eq!(once, twice);
389        assert_eq!(twice, thrice);
390    }
391
392    #[test]
393    fn test_orset_add_delta() {
394        let mut state: ORSet<String> = ORSet::new();
395
396        // Apply add via delta
397        let delta = orset::add_delta("replica1", "hello".to_string());
398        state.apply_delta(&delta);
399
400        assert!(state.contains(&"hello".to_string()));
401    }
402
403    #[test]
404    fn test_orset_delta_idempotence() {
405        let mut state: ORSet<String> = ORSet::new();
406
407        let delta = orset::add_delta("replica1", "test".to_string());
408
409        // Apply same delta multiple times
410        state.apply_delta(&delta);
411        let count1 = state.len();
412
413        state.apply_delta(&delta);
414        let count2 = state.len();
415
416        // Idempotent (same tags won't be added twice)
417        assert_eq!(count1, count2);
418    }
419
420    #[test]
421    fn test_lwwreg_set_delta() {
422        let mut state: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
423
424        // Apply set via delta
425        lwwreg::apply_set(&mut state, 42, 100, "replica1".to_string());
426
427        assert_eq!(state.get(), Some(&42));
428        assert_eq!(state.timestamp(), 100);
429    }
430
431    #[test]
432    fn test_lwwreg_delta_higher_timestamp_wins() {
433        let mut state: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
434
435        lwwreg::apply_set(&mut state, 10, 100, "replica1".to_string());
436        assert_eq!(state.get(), Some(&10));
437
438        lwwreg::apply_set(&mut state, 20, 200, "replica2".to_string());
439        assert_eq!(state.get(), Some(&20));
440
441        // Old timestamp doesn't overwrite
442        lwwreg::apply_set(&mut state, 30, 150, "replica1".to_string());
443        assert_eq!(state.get(), Some(&20));
444    }
445
446    #[test]
447    fn test_pncounter_increment_delta() {
448        let mut state: PNCounter<String> = PNCounter::new();
449
450        // Apply increment via delta
451        pncounter::apply_increment(&mut state, "replica1".to_string(), 5);
452        assert_eq!(state.value(), 5);
453
454        pncounter::apply_increment(&mut state, "replica1".to_string(), 3);
455        assert_eq!(state.value(), 8);
456    }
457
458    #[test]
459    fn test_pncounter_decrement_delta() {
460        let mut state: PNCounter<String> = PNCounter::new();
461
462        pncounter::apply_increment(&mut state, "replica1".to_string(), 10);
463        assert_eq!(state.value(), 10);
464
465        pncounter::apply_decrement(&mut state, "replica1".to_string(), 3);
466        assert_eq!(state.value(), 7);
467    }
468
469    #[test]
470    fn test_pncounter_delta_convergence() {
471        let mut state1: PNCounter<String> = PNCounter::new();
472        let mut state2: PNCounter<String> = PNCounter::new();
473
474        // Apply different operations to each state
475        pncounter::apply_increment(&mut state1, "replica1".to_string(), 5);
476        pncounter::apply_increment(&mut state2, "replica2".to_string(), 3);
477        pncounter::apply_decrement(&mut state2, "replica1".to_string(), 2);
478
479        // Merge states
480        let merged1 = state1.join(&state2);
481        let merged2 = state2.join(&state1);
482
483        assert_eq!(merged1.value(), merged2.value());
484    }
485
486    #[test]
487    fn test_mvreg_write_delta() {
488        let mut state: MVRegister<i32> = MVRegister::new();
489
490        // Apply write via delta
491        let _dot = mvreg::apply_write(&mut state, "replica1", 42);
492
493        let values = state.read();
494        assert_eq!(values.len(), 1);
495        assert_eq!(values[0], &42);
496    }
497
498    #[test]
499    fn test_mvreg_concurrent_deltas() {
500        let mut state1: MVRegister<i32> = MVRegister::new();
501        let mut state2: MVRegister<i32> = MVRegister::new();
502
503        let _dot1 = mvreg::apply_write(&mut state1, "replica1", 10);
504        let _dot2 = mvreg::apply_write(&mut state2, "replica2", 20);
505
506        // Merge: both values should exist
507        let merged = state1.join(&state2);
508        let values = merged.read();
509        assert_eq!(values.len(), 2);
510    }
511}