1use mdcs_core::gset::GSet;
10use mdcs_core::lattice::Lattice;
11use mdcs_core::orset::{ORSet, ORSetDelta, Tag};
12use std::collections::{BTreeMap, BTreeSet};
13
14pub trait DeltaMutator<S: Lattice>: Lattice {
19 fn apply_to(&self, state: &S) -> S;
21}
22
23#[derive(Clone, Debug, PartialEq, Eq)]
32pub struct GSetInsertDelta<T: Ord + Clone> {
33 element: T,
34}
35
36impl<T: Ord + Clone> GSetInsertDelta<T> {
37 pub fn new(element: T) -> Self {
39 Self { element }
40 }
41
42 pub fn element(&self) -> &T {
44 &self.element
45 }
46}
47
48impl<T: Ord + Clone> From<GSetInsertDelta<T>> for GSet<T> {
50 fn from(delta: GSetInsertDelta<T>) -> Self {
51 let mut set = GSet::new();
52 set.insert(delta.element);
53 set
54 }
55}
56
57pub mod gset {
59 use super::*;
60
61 pub fn insert_delta<T: Ord + Clone>(value: T) -> GSet<T> {
64 let mut delta = GSet::new();
65 delta.insert(value);
66 delta
67 }
68
69 pub fn insert_batch_delta<T: Ord + Clone>(values: impl IntoIterator<Item = T>) -> GSet<T> {
71 let mut delta = GSet::new();
72 for value in values {
73 delta.insert(value);
74 }
75 delta
76 }
77
78 pub fn apply_insert<T: Ord + Clone>(state: &mut GSet<T>, value: T) -> GSet<T> {
80 let delta = insert_delta(value);
81 state.join_assign(&delta);
82 delta
83 }
84}
85
86pub mod orset {
92 use super::*;
93
94 pub fn add_delta<T: Ord + Clone>(replica_id: &str, value: T) -> ORSetDelta<T> {
97 let tag = Tag::new(replica_id);
98 let mut additions = BTreeMap::new();
99 let mut tags = BTreeSet::new();
100 tags.insert(tag);
101 additions.insert(value, tags);
102
103 ORSetDelta {
104 additions,
105 removals: BTreeSet::new(),
106 }
107 }
108
109 pub fn remove_delta<T: Ord + Clone>(state: &ORSet<T>, value: &T) -> ORSetDelta<T> {
112 let removals = if state.contains(value) {
115 BTreeSet::new()
118 } else {
119 BTreeSet::new()
120 };
121
122 ORSetDelta {
123 additions: BTreeMap::new(),
124 removals,
125 }
126 }
127
128 pub fn apply_add<T: Ord + Clone>(
130 state: &mut ORSet<T>,
131 replica_id: &str,
132 value: T,
133 ) -> ORSetDelta<T> {
134 state.add(replica_id, value.clone());
136 add_delta(replica_id, value)
137 }
138}
139
140pub mod lwwreg {
145 use super::*;
146 use mdcs_core::lwwreg::LWWRegister;
147 use serde::{Deserialize, Serialize};
148
149 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
151 pub struct LWWWriteDelta<T: Ord + Clone, K: Ord + Clone> {
152 pub timestamp: u64,
153 pub replica_id: K,
154 pub value: T,
155 }
156
157 impl<T: Ord + Clone, K: Ord + Clone> Lattice for LWWWriteDelta<T, K> {
158 fn bottom() -> Self {
159 panic!("LWWWriteDelta has no bottom element");
160 }
161
162 fn join(&self, other: &Self) -> Self {
163 if other.timestamp > self.timestamp
165 || (other.timestamp == self.timestamp && other.replica_id > self.replica_id)
166 {
167 other.clone()
168 } else {
169 self.clone()
170 }
171 }
172 }
173
174 pub fn set_delta<T: Ord + Clone, K: Ord + Clone>(
177 value: T,
178 timestamp: u64,
179 replica_id: K,
180 ) -> LWWWriteDelta<T, K> {
181 LWWWriteDelta {
182 timestamp,
183 replica_id,
184 value,
185 }
186 }
187
188 pub fn apply_set<T: Ord + Clone, K: Ord + Clone + Default>(
190 state: &mut LWWRegister<T, K>,
191 value: T,
192 timestamp: u64,
193 replica_id: K,
194 ) {
195 state.set(value, timestamp, replica_id);
196 }
197}
198
199pub mod pncounter {
204 use super::*;
205 use mdcs_core::pncounter::PNCounter;
206 use serde::{Deserialize, Serialize};
207
208 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
210 pub struct IncrementDelta<K: Ord + Clone> {
211 pub replica_id: K,
212 pub amount: u64,
213 }
214
215 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
217 pub struct DecrementDelta<K: Ord + Clone> {
218 pub replica_id: K,
219 pub amount: u64,
220 }
221
222 impl<K: Ord + Clone> Lattice for IncrementDelta<K> {
223 fn bottom() -> Self {
224 panic!("IncrementDelta has no bottom element");
225 }
226
227 fn join(&self, other: &Self) -> Self {
228 if self.replica_id == other.replica_id {
230 Self {
231 replica_id: self.replica_id.clone(),
232 amount: self.amount.max(other.amount),
233 }
234 } else {
235 self.clone() }
237 }
238 }
239
240 impl<K: Ord + Clone> Lattice for DecrementDelta<K> {
241 fn bottom() -> Self {
242 panic!("DecrementDelta has no bottom element");
243 }
244
245 fn join(&self, other: &Self) -> Self {
246 if self.replica_id == other.replica_id {
247 Self {
248 replica_id: self.replica_id.clone(),
249 amount: self.amount.max(other.amount),
250 }
251 } else {
252 self.clone()
253 }
254 }
255 }
256
257 pub fn increment_delta<K: Ord + Clone>(replica_id: K, amount: u64) -> IncrementDelta<K> {
259 IncrementDelta { replica_id, amount }
260 }
261
262 pub fn decrement_delta<K: Ord + Clone>(replica_id: K, amount: u64) -> DecrementDelta<K> {
264 DecrementDelta { replica_id, amount }
265 }
266
267 pub fn apply_increment<K: Ord + Clone>(state: &mut PNCounter<K>, replica_id: K, amount: u64) {
269 state.increment(replica_id, amount);
270 }
271
272 pub fn apply_decrement<K: Ord + Clone>(state: &mut PNCounter<K>, replica_id: K, amount: u64) {
274 state.decrement(replica_id, amount);
275 }
276}
277
278pub mod mvreg {
283 use super::*;
284 use mdcs_core::mvreg::{Dot, MVRegister};
285 use serde::{Deserialize, Serialize};
286
287 #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
289 pub struct WriteDelta<T: Ord + Clone> {
290 pub dot: Dot,
291 pub value: T,
292 }
293
294 impl<T: Ord + Clone> Lattice for WriteDelta<T> {
295 fn bottom() -> Self {
296 panic!("WriteDelta has no bottom element");
297 }
298
299 fn join(&self, _other: &Self) -> Self {
300 self.clone()
303 }
304 }
305
306 pub fn write_delta<T: Ord + Clone>(dot: Dot, value: T) -> WriteDelta<T> {
308 WriteDelta { dot, value }
309 }
310
311 pub fn apply_write<T: Ord + Clone>(
313 state: &mut MVRegister<T>,
314 replica_id: &str,
315 value: T,
316 ) -> Dot {
317 state.write(replica_id, value)
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use mdcs_core::lattice::DeltaCRDT;
325 use mdcs_core::lwwreg::LWWRegister;
326 use mdcs_core::mvreg::MVRegister;
327 use mdcs_core::pncounter::PNCounter;
328
329 #[test]
330 fn test_gset_insert_delta() {
331 let mut state: GSet<i32> = GSet::new();
332 state.insert(1);
333 state.insert(2);
334
335 let delta = gset::insert_delta(3);
337
338 let result = state.join(&delta);
340
341 assert!(result.contains(&1));
342 assert!(result.contains(&2));
343 assert!(result.contains(&3));
344 }
345
346 #[test]
347 fn test_gset_delta_property() {
348 let mut state: GSet<i32> = GSet::new();
350 state.insert(1);
351
352 let mut direct = state.clone();
354 direct.insert(42);
355
356 let delta = gset::insert_delta(42);
358 let via_delta = state.join(&delta);
359
360 assert_eq!(direct, via_delta);
361 }
362
363 #[test]
364 fn test_gset_batch_delta() {
365 let state: GSet<i32> = GSet::new();
366
367 let delta = gset::insert_batch_delta(vec![1, 2, 3, 4, 5]);
368 let result = state.join(&delta);
369
370 for i in 1..=5 {
371 assert!(result.contains(&i));
372 }
373 }
374
375 #[test]
376 fn test_gset_delta_idempotence() {
377 let mut state: GSet<i32> = GSet::new();
378 state.insert(1);
379
380 let delta = gset::insert_delta(2);
381
382 let once = state.join(&delta);
384 let twice = once.join(&delta);
385 let thrice = twice.join(&delta);
386
387 assert_eq!(once, twice);
389 assert_eq!(twice, thrice);
390 }
391
392 #[test]
393 fn test_orset_add_delta() {
394 let mut state: ORSet<String> = ORSet::new();
395
396 let delta = orset::add_delta("replica1", "hello".to_string());
398 state.apply_delta(&delta);
399
400 assert!(state.contains(&"hello".to_string()));
401 }
402
403 #[test]
404 fn test_orset_delta_idempotence() {
405 let mut state: ORSet<String> = ORSet::new();
406
407 let delta = orset::add_delta("replica1", "test".to_string());
408
409 state.apply_delta(&delta);
411 let count1 = state.len();
412
413 state.apply_delta(&delta);
414 let count2 = state.len();
415
416 assert_eq!(count1, count2);
418 }
419
420 #[test]
421 fn test_lwwreg_set_delta() {
422 let mut state: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
423
424 lwwreg::apply_set(&mut state, 42, 100, "replica1".to_string());
426
427 assert_eq!(state.get(), Some(&42));
428 assert_eq!(state.timestamp(), 100);
429 }
430
431 #[test]
432 fn test_lwwreg_delta_higher_timestamp_wins() {
433 let mut state: LWWRegister<i32, String> = LWWRegister::new("replica1".to_string());
434
435 lwwreg::apply_set(&mut state, 10, 100, "replica1".to_string());
436 assert_eq!(state.get(), Some(&10));
437
438 lwwreg::apply_set(&mut state, 20, 200, "replica2".to_string());
439 assert_eq!(state.get(), Some(&20));
440
441 lwwreg::apply_set(&mut state, 30, 150, "replica1".to_string());
443 assert_eq!(state.get(), Some(&20));
444 }
445
446 #[test]
447 fn test_pncounter_increment_delta() {
448 let mut state: PNCounter<String> = PNCounter::new();
449
450 pncounter::apply_increment(&mut state, "replica1".to_string(), 5);
452 assert_eq!(state.value(), 5);
453
454 pncounter::apply_increment(&mut state, "replica1".to_string(), 3);
455 assert_eq!(state.value(), 8);
456 }
457
458 #[test]
459 fn test_pncounter_decrement_delta() {
460 let mut state: PNCounter<String> = PNCounter::new();
461
462 pncounter::apply_increment(&mut state, "replica1".to_string(), 10);
463 assert_eq!(state.value(), 10);
464
465 pncounter::apply_decrement(&mut state, "replica1".to_string(), 3);
466 assert_eq!(state.value(), 7);
467 }
468
469 #[test]
470 fn test_pncounter_delta_convergence() {
471 let mut state1: PNCounter<String> = PNCounter::new();
472 let mut state2: PNCounter<String> = PNCounter::new();
473
474 pncounter::apply_increment(&mut state1, "replica1".to_string(), 5);
476 pncounter::apply_increment(&mut state2, "replica2".to_string(), 3);
477 pncounter::apply_decrement(&mut state2, "replica1".to_string(), 2);
478
479 let merged1 = state1.join(&state2);
481 let merged2 = state2.join(&state1);
482
483 assert_eq!(merged1.value(), merged2.value());
484 }
485
486 #[test]
487 fn test_mvreg_write_delta() {
488 let mut state: MVRegister<i32> = MVRegister::new();
489
490 let _dot = mvreg::apply_write(&mut state, "replica1", 42);
492
493 let values = state.read();
494 assert_eq!(values.len(), 1);
495 assert_eq!(values[0], &42);
496 }
497
498 #[test]
499 fn test_mvreg_concurrent_deltas() {
500 let mut state1: MVRegister<i32> = MVRegister::new();
501 let mut state2: MVRegister<i32> = MVRegister::new();
502
503 let _dot1 = mvreg::apply_write(&mut state1, "replica1", 10);
504 let _dot2 = mvreg::apply_write(&mut state2, "replica2", 20);
505
506 let merged = state1.join(&state2);
508 let values = merged.read();
509 assert_eq!(values.len(), 2);
510 }
511}