1use crate::buffer::{ReplicaId, SeqNo};
67use mdcs_core::lattice::Lattice;
68use serde::{Deserialize, Serialize};
69use std::collections::{HashMap, VecDeque};
70
71#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
80pub struct DeltaInterval<D> {
81 pub from: ReplicaId,
83 pub to: ReplicaId,
85 pub delta: D,
87 pub from_seq: SeqNo,
89 pub to_seq: SeqNo,
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
95pub struct IntervalAck {
96 pub from: ReplicaId,
97 pub to: ReplicaId,
98 pub acked_seq: SeqNo,
100}
101
102#[derive(Debug, Clone)]
104pub enum CausalMessage<D> {
105 DeltaInterval(DeltaInterval<D>),
107 Ack(IntervalAck),
109 SnapshotRequest { from: ReplicaId, to: ReplicaId },
111 Snapshot {
113 from: ReplicaId,
114 to: ReplicaId,
115 state: D,
116 seq: SeqNo,
117 },
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct DurableState<S> {
126 pub replica_id: ReplicaId,
128 pub state: S,
130 pub counter: SeqNo,
132}
133
134impl<S: Lattice> DurableState<S> {
135 pub fn new(replica_id: impl Into<ReplicaId>) -> Self {
136 Self {
137 replica_id: replica_id.into(),
138 state: S::bottom(),
139 counter: 0,
140 }
141 }
142}
143
144#[derive(Debug, Clone)]
149pub struct PeerDeltaBuffer<D: Lattice> {
150 delta: Option<D>,
152 from_seq: SeqNo,
154 to_seq: SeqNo,
156}
157
158impl<D: Lattice> PeerDeltaBuffer<D> {
159 pub fn new() -> Self {
160 Self {
161 delta: None,
162 from_seq: 0,
163 to_seq: 0,
164 }
165 }
166
167 pub fn start_from(seq: SeqNo) -> Self {
169 Self {
170 delta: None,
171 from_seq: seq,
172 to_seq: seq,
173 }
174 }
175
176 pub fn push(&mut self, delta: D, seq: SeqNo) {
178 match &mut self.delta {
179 Some(existing) => {
180 existing.join_assign(&delta);
181 }
182 None => {
183 self.delta = Some(delta);
184 }
185 }
186 self.to_seq = seq;
187 }
188
189 pub fn has_pending(&self) -> bool {
191 self.delta.is_some()
192 }
193
194 pub fn take(&mut self) -> Option<(D, SeqNo, SeqNo)> {
196 self.delta.take().map(|d| {
197 let from = self.from_seq;
198 let to = self.to_seq;
199 self.from_seq = to;
200 (d, from, to)
201 })
202 }
203
204 pub fn clear(&mut self) {
206 self.delta = None;
207 self.from_seq = self.to_seq;
208 }
209
210 pub fn reset_from(&mut self, seq: SeqNo) {
212 self.delta = None;
213 self.from_seq = seq;
214 self.to_seq = seq;
215 }
216}
217
218impl<D: Lattice> Default for PeerDeltaBuffer<D> {
219 fn default() -> Self {
220 Self::new()
221 }
222}
223
224#[derive(Debug, Clone)]
226pub struct VolatileState<D: Lattice> {
227 pub delta_buffers: HashMap<ReplicaId, PeerDeltaBuffer<D>>,
229 pub peer_acks: HashMap<ReplicaId, SeqNo>,
232}
233
234impl<D: Lattice> VolatileState<D> {
235 pub fn new() -> Self {
236 Self {
237 delta_buffers: HashMap::new(),
238 peer_acks: HashMap::new(),
239 }
240 }
241
242 pub fn register_peer(&mut self, peer_id: ReplicaId) {
244 self.delta_buffers.entry(peer_id.clone()).or_default();
245 self.peer_acks.entry(peer_id).or_insert(0);
246 }
247
248 pub fn get_peer_ack(&self, peer_id: &str) -> SeqNo {
250 self.peer_acks.get(peer_id).copied().unwrap_or(0)
251 }
252
253 pub fn update_peer_ack(&mut self, peer_id: &str, seq: SeqNo) {
255 if let Some(ack) = self.peer_acks.get_mut(peer_id) {
256 *ack = (*ack).max(seq);
257 }
258 }
259}
260
261impl<D: Lattice> Default for VolatileState<D> {
262 fn default() -> Self {
263 Self::new()
264 }
265}
266
267#[derive(Debug, Clone)]
274pub struct CausalReplica<S: Lattice + Clone> {
275 durable: DurableState<S>,
277 volatile: VolatileState<S>,
279 pending: HashMap<ReplicaId, VecDeque<DeltaInterval<S>>>,
281}
282
283impl<S: Lattice + Clone> CausalReplica<S> {
284 pub fn new(id: impl Into<ReplicaId>) -> Self {
286 Self {
287 durable: DurableState::new(id),
288 volatile: VolatileState::new(),
289 pending: HashMap::new(),
290 }
291 }
292
293 pub fn restore(durable: DurableState<S>) -> Self {
295 Self {
296 durable,
297 volatile: VolatileState::new(),
298 pending: HashMap::new(),
299 }
300 }
301
302 pub fn id(&self) -> &ReplicaId {
304 &self.durable.replica_id
305 }
306
307 pub fn state(&self) -> &S {
309 &self.durable.state
310 }
311
312 pub fn counter(&self) -> SeqNo {
314 self.durable.counter
315 }
316
317 pub fn durable_state(&self) -> &DurableState<S> {
319 &self.durable
320 }
321
322 pub fn register_peer(&mut self, peer_id: ReplicaId) {
324 self.volatile.register_peer(peer_id.clone());
325 self.pending.entry(peer_id).or_default();
326 }
327
328 pub fn mutate<F>(&mut self, mutator: F) -> S
340 where
341 F: FnOnce(&S) -> S,
342 {
343 self.durable.counter += 1;
345 let seq = self.durable.counter;
346
347 let delta = mutator(&self.durable.state);
349
350 self.durable.state.join_assign(&delta);
352
353 for buffer in self.volatile.delta_buffers.values_mut() {
355 buffer.push(delta.clone(), seq);
356 }
357
358 delta
359 }
360
361 pub fn prepare_interval(&mut self, peer_id: &str) -> Option<DeltaInterval<S>> {
366 let buffer = self.volatile.delta_buffers.get_mut(peer_id)?;
367
368 buffer
369 .take()
370 .map(|(delta, from_seq, to_seq)| DeltaInterval {
371 from: self.durable.replica_id.clone(),
372 to: peer_id.to_string(),
373 delta,
374 from_seq,
375 to_seq,
376 })
377 }
378
379 fn is_causally_ready(&self, interval: &DeltaInterval<S>) -> bool {
383 let last_acked = self.volatile.get_peer_ack(&interval.from);
384 interval.from_seq == last_acked
385 }
386
387 pub fn receive_interval(&mut self, interval: DeltaInterval<S>) -> Option<IntervalAck> {
402 if !self.volatile.peer_acks.contains_key(&interval.from) {
404 self.register_peer(interval.from.clone());
405 }
406
407 if self.is_causally_ready(&interval) {
408 self.durable.state.join_assign(&interval.delta);
410
411 self.volatile
413 .update_peer_ack(&interval.from, interval.to_seq);
414
415 let ack = IntervalAck {
416 from: self.durable.replica_id.clone(),
417 to: interval.from.clone(),
418 acked_seq: interval.to_seq,
419 };
420
421 self.try_apply_pending(&interval.from);
423
424 Some(ack)
425 } else {
426 let pending = self.pending.entry(interval.from.clone()).or_default();
428
429 let pos = pending.iter().position(|p| p.from_seq > interval.from_seq);
431 match pos {
432 Some(i) => pending.insert(i, interval),
433 None => pending.push_back(interval),
434 }
435
436 None
437 }
438 }
439
440 fn try_apply_pending(&mut self, peer_id: &str) -> Vec<IntervalAck> {
442 let mut acks = Vec::new();
443
444 if let Some(pending) = self.pending.get_mut(peer_id) {
445 while let Some(interval) = pending.front() {
446 let last_acked = self.volatile.get_peer_ack(peer_id);
447 if interval.from_seq == last_acked {
448 let interval = pending.pop_front().unwrap();
449
450 self.durable.state.join_assign(&interval.delta);
452
453 self.volatile.update_peer_ack(peer_id, interval.to_seq);
455
456 acks.push(IntervalAck {
457 from: self.durable.replica_id.clone(),
458 to: interval.from.clone(),
459 acked_seq: interval.to_seq,
460 });
461 } else {
462 break;
463 }
464 }
465 }
466
467 acks
468 }
469
470 pub fn receive_ack(&mut self, ack: &IntervalAck) {
477 if let Some(buffer) = self.volatile.delta_buffers.get_mut(&ack.from) {
478 buffer.clear();
479 }
480 }
481
482 pub fn snapshot(&self) -> (S, SeqNo) {
484 (self.durable.state.clone(), self.durable.counter)
485 }
486
487 pub fn apply_snapshot(&mut self, state: S, seq: SeqNo, from: &str) {
489 self.durable.state.join_assign(&state);
490 self.volatile.update_peer_ack(from, seq);
491 }
492
493 pub fn peers(&self) -> impl Iterator<Item = &ReplicaId> {
495 self.volatile.peer_acks.keys()
496 }
497
498 pub fn has_pending_deltas(&self) -> bool {
500 self.volatile
501 .delta_buffers
502 .values()
503 .any(|b| b.has_pending())
504 }
505
506 pub fn pending_count(&self) -> usize {
508 self.pending.values().map(|v| v.len()).sum()
509 }
510}
511
512pub trait DurableStorage<S: Lattice> {
516 fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError>;
518
519 fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError>;
521
522 fn sync(&mut self) -> Result<(), StorageError>;
524}
525
526#[derive(Debug, Clone)]
528pub enum StorageError {
529 IoError(String),
530 SerializationError(String),
531 NotFound,
532}
533
534impl std::fmt::Display for StorageError {
535 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
536 match self {
537 StorageError::IoError(msg) => write!(f, "IO error: {}", msg),
538 StorageError::SerializationError(msg) => write!(f, "Serialization error: {}", msg),
539 StorageError::NotFound => write!(f, "State not found"),
540 }
541 }
542}
543
544impl std::error::Error for StorageError {}
545
546#[derive(Debug, Default)]
548pub struct MemoryStorage<S> {
549 states: HashMap<ReplicaId, DurableState<S>>,
550}
551
552impl<S: Clone> MemoryStorage<S> {
553 pub fn new() -> Self {
554 Self {
555 states: HashMap::new(),
556 }
557 }
558}
559
560impl<S: Lattice + Clone + Serialize + for<'de> Deserialize<'de>> DurableStorage<S>
561 for MemoryStorage<S>
562{
563 fn persist(&mut self, state: &DurableState<S>) -> Result<(), StorageError> {
564 self.states.insert(state.replica_id.clone(), state.clone());
565 Ok(())
566 }
567
568 fn load(&self, replica_id: &str) -> Result<Option<DurableState<S>>, StorageError> {
569 Ok(self.states.get(replica_id).cloned())
570 }
571
572 fn sync(&mut self) -> Result<(), StorageError> {
573 Ok(())
574 }
575}
576
577#[derive(Debug)]
579pub struct CausalNetworkSimulator<D> {
580 in_flight: VecDeque<CausalMessage<D>>,
582 lost: Vec<CausalMessage<D>>,
584 loss_rate: f64,
586 rng_state: u64,
588}
589
590impl<D: Clone> CausalNetworkSimulator<D> {
591 pub fn new(loss_rate: f64) -> Self {
592 Self {
593 in_flight: VecDeque::new(),
594 lost: Vec::new(),
595 loss_rate,
596 rng_state: 42,
597 }
598 }
599
600 fn next_random(&mut self) -> f64 {
602 self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
603 ((self.rng_state >> 16) & 0x7fff) as f64 / 32768.0
604 }
605
606 pub fn send(&mut self, msg: CausalMessage<D>) {
608 if self.next_random() < self.loss_rate {
609 self.lost.push(msg);
610 } else {
611 self.in_flight.push_back(msg);
612 }
613 }
614
615 pub fn receive(&mut self) -> Option<CausalMessage<D>> {
617 self.in_flight.pop_front()
618 }
619
620 pub fn retransmit_lost(&mut self) {
622 for msg in self.lost.drain(..) {
623 self.in_flight.push_back(msg);
624 }
625 }
626
627 pub fn is_empty(&self) -> bool {
629 self.in_flight.is_empty()
630 }
631
632 pub fn in_flight_count(&self) -> usize {
634 self.in_flight.len()
635 }
636
637 pub fn lost_count(&self) -> usize {
639 self.lost.len()
640 }
641}
642
643#[derive(Debug)]
645pub struct CausalCluster<S: Lattice + Clone> {
646 replicas: Vec<CausalReplica<S>>,
648 network: CausalNetworkSimulator<S>,
650}
651
652impl<S: Lattice + Clone> CausalCluster<S> {
653 pub fn new(n: usize, loss_rate: f64) -> Self {
655 let mut replicas = Vec::with_capacity(n);
656
657 for i in 0..n {
659 let mut replica = CausalReplica::new(format!("causal_{}", i));
660 for j in 0..n {
662 if i != j {
663 replica.register_peer(format!("causal_{}", j));
664 }
665 }
666 replicas.push(replica);
667 }
668
669 Self {
670 replicas,
671 network: CausalNetworkSimulator::new(loss_rate),
672 }
673 }
674
675 pub fn replica(&self, idx: usize) -> &CausalReplica<S> {
677 &self.replicas[idx]
678 }
679
680 pub fn replica_mut(&mut self, idx: usize) -> &mut CausalReplica<S> {
682 &mut self.replicas[idx]
683 }
684
685 pub fn mutate<F>(&mut self, replica_idx: usize, mutator: F) -> S
687 where
688 F: FnOnce(&S) -> S,
689 {
690 self.replicas[replica_idx].mutate(mutator)
691 }
692
693 pub fn broadcast_intervals(&mut self, from_idx: usize) {
695 let replica = &mut self.replicas[from_idx];
696 let peer_ids: Vec<_> = replica.peers().cloned().collect();
697
698 for peer_id in peer_ids {
699 if let Some(interval) = replica.prepare_interval(&peer_id) {
700 self.network.send(CausalMessage::DeltaInterval(interval));
701 }
702 }
703 }
704
705 pub fn process_one(&mut self) -> bool {
707 if let Some(msg) = self.network.receive() {
708 match msg {
709 CausalMessage::DeltaInterval(interval) => {
710 for replica in &mut self.replicas {
712 if replica.id() == &interval.to {
713 if let Some(ack) = replica.receive_interval(interval.clone()) {
714 self.network.send(CausalMessage::Ack(ack));
715 }
716 break;
717 }
718 }
719 }
720 CausalMessage::Ack(ack) => {
721 for replica in &mut self.replicas {
723 if replica.id() == &ack.to {
724 replica.receive_ack(&ack);
725 break;
726 }
727 }
728 }
729 CausalMessage::SnapshotRequest { from, to } => {
730 for replica in &self.replicas {
732 if replica.id() == &to {
733 let (state, seq) = replica.snapshot();
734 self.network.send(CausalMessage::Snapshot {
735 from: to,
736 to: from,
737 state,
738 seq,
739 });
740 break;
741 }
742 }
743 }
744 CausalMessage::Snapshot {
745 from,
746 to,
747 state,
748 seq,
749 } => {
750 for replica in &mut self.replicas {
752 if replica.id() == &to {
753 replica.apply_snapshot(state, seq, &from);
754 break;
755 }
756 }
757 }
758 }
759 true
760 } else {
761 false
762 }
763 }
764
765 pub fn drain_network(&mut self) {
767 while self.process_one() {}
768 }
769
770 pub fn full_sync_round(&mut self) {
772 let n = self.replicas.len();
773 for i in 0..n {
774 self.broadcast_intervals(i);
775 }
776 self.drain_network();
777 }
778
779 pub fn is_converged(&self) -> bool {
781 if self.replicas.len() < 2 {
782 return true;
783 }
784
785 let first = self.replicas[0].state();
786 self.replicas.iter().skip(1).all(|r| r.state() == first)
787 }
788
789 pub fn retransmit_and_process(&mut self) {
791 self.network.retransmit_lost();
792 self.drain_network();
793 }
794
795 pub fn len(&self) -> usize {
797 self.replicas.len()
798 }
799
800 pub fn is_empty(&self) -> bool {
802 self.replicas.is_empty()
803 }
804
805 pub fn crash_and_recover(&mut self, idx: usize) {
807 let durable = self.replicas[idx].durable_state().clone();
808
809 let mut recovered = CausalReplica::restore(durable);
811
812 let n = self.replicas.len();
814 for j in 0..n {
815 if idx != j {
816 recovered.register_peer(format!("causal_{}", j));
817 }
818 }
819
820 self.replicas[idx] = recovered;
821 }
822
823 pub fn total_pending(&self) -> usize {
825 self.replicas.iter().map(|r| r.pending_count()).sum()
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832 use mdcs_core::gset::GSet;
833 use mdcs_core::pncounter::PNCounter;
834
835 #[test]
836 fn test_causal_replica_basic() {
837 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
838
839 replica.mutate(|_| {
840 let mut d = GSet::new();
841 d.insert(42);
842 d
843 });
844
845 assert!(replica.state().contains(&42));
846 assert_eq!(replica.counter(), 1);
847 }
848
849 #[test]
850 fn test_causal_interval_generation() {
851 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test1");
852 replica.register_peer("peer1".to_string());
853
854 replica.mutate(|_| {
855 let mut d = GSet::new();
856 d.insert(1);
857 d
858 });
859
860 replica.mutate(|_| {
861 let mut d = GSet::new();
862 d.insert(2);
863 d
864 });
865
866 let interval = replica.prepare_interval("peer1").unwrap();
867 assert_eq!(interval.from_seq, 0);
868 assert_eq!(interval.to_seq, 2);
869 assert!(interval.delta.contains(&1));
870 assert!(interval.delta.contains(&2));
871 }
872
873 #[test]
874 fn test_causal_delivery() {
875 let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
876 let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
877
878 r1.register_peer("r2".to_string());
879 r2.register_peer("r1".to_string());
880
881 r1.mutate(|_| {
883 let mut d = GSet::new();
884 d.insert(1);
885 d
886 });
887 r1.mutate(|_| {
888 let mut d = GSet::new();
889 d.insert(2);
890 d
891 });
892
893 let interval = r1.prepare_interval("r2").unwrap();
895 assert_eq!(interval.from_seq, 0);
896 assert_eq!(interval.to_seq, 2);
897
898 let ack = r2.receive_interval(interval).unwrap();
900 assert_eq!(ack.acked_seq, 2);
901
902 assert!(r2.state().contains(&1));
904 assert!(r2.state().contains(&2));
905 }
906
907 #[test]
908 fn test_out_of_order_buffering() {
909 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
910 replica.register_peer("peer".to_string());
911
912 let out_of_order = DeltaInterval {
914 from: "peer".to_string(),
915 to: "r1".to_string(),
916 delta: {
917 let mut d = GSet::new();
918 d.insert(999);
919 d
920 },
921 from_seq: 5, to_seq: 6,
923 };
924
925 let result = replica.receive_interval(out_of_order);
927 assert!(result.is_none());
928 assert_eq!(replica.pending_count(), 1);
929 assert!(!replica.state().contains(&999));
930 }
931
932 #[test]
933 fn test_cluster_convergence() {
934 let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.0);
935
936 for i in 0..3 {
938 let val = (i + 1) as i32;
939 cluster.mutate(i, move |_| {
940 let mut d = GSet::new();
941 d.insert(val);
942 d
943 });
944 }
945
946 assert!(!cluster.is_converged());
948
949 cluster.full_sync_round();
951
952 assert!(cluster.is_converged());
954
955 for i in 0..3 {
957 for val in 1..=3 {
958 assert!(cluster.replica(i).state().contains(&val));
959 }
960 }
961 }
962
963 #[test]
964 fn test_cluster_with_loss() {
965 let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(3, 0.3);
966
967 for i in 0..3 {
968 let val = (i + 1) as i32;
969 cluster.mutate(i, move |_| {
970 let mut d = GSet::new();
971 d.insert(val);
972 d
973 });
974 }
975
976 for _ in 0..10 {
978 cluster.full_sync_round();
979 cluster.retransmit_and_process();
980 }
981
982 assert!(cluster.is_converged());
984 }
985
986 #[test]
987 fn test_crash_recovery() {
988 let mut cluster: CausalCluster<GSet<i32>> = CausalCluster::new(2, 0.0);
989
990 cluster.mutate(0, |_| {
992 let mut d = GSet::new();
993 d.insert(1);
994 d
995 });
996
997 cluster.full_sync_round();
999 assert!(cluster.is_converged());
1000
1001 cluster.mutate(0, |_| {
1003 let mut d = GSet::new();
1004 d.insert(2);
1005 d
1006 });
1007
1008 let counter_before = cluster.replica(0).counter();
1010 cluster.crash_and_recover(0);
1011
1012 assert_eq!(cluster.replica(0).counter(), counter_before);
1014 assert!(cluster.replica(0).state().contains(&1));
1015 assert!(cluster.replica(0).state().contains(&2));
1016
1017 assert!(!cluster.replica(0).has_pending_deltas());
1020 }
1021
1022 #[test]
1023 fn test_pncounter_causal() {
1024 let mut cluster: CausalCluster<PNCounter<String>> = CausalCluster::new(2, 0.0);
1025
1026 cluster.mutate(0, |_s| {
1028 let mut delta = PNCounter::new();
1029 delta.increment("r0".to_string(), 1);
1030 delta
1031 });
1032
1033 cluster.mutate(1, |_s| {
1035 let mut delta = PNCounter::new();
1036 delta.decrement("r1".to_string(), 1);
1037 delta
1038 });
1039
1040 cluster.full_sync_round();
1042
1043 assert!(cluster.is_converged());
1045 assert_eq!(cluster.replica(0).state().value(), 0);
1046 }
1047
1048 #[test]
1049 fn test_causal_ordering_preserved() {
1050 let mut r1: CausalReplica<GSet<i32>> = CausalReplica::new("r1");
1052 let mut r2: CausalReplica<GSet<i32>> = CausalReplica::new("r2");
1053
1054 r1.register_peer("r2".to_string());
1055 r2.register_peer("r1".to_string());
1056
1057 for i in 1..=3 {
1059 r1.mutate(move |_| {
1060 let mut d = GSet::new();
1061 d.insert(i);
1062 d
1063 });
1064 }
1065
1066 let interval_1_3 = DeltaInterval {
1071 from: "r1".to_string(),
1072 to: "r2".to_string(),
1073 delta: {
1074 let mut d = GSet::new();
1075 d.insert(3);
1076 d
1077 },
1078 from_seq: 2, to_seq: 3,
1080 };
1081
1082 let interval_0_2 = DeltaInterval {
1083 from: "r1".to_string(),
1084 to: "r2".to_string(),
1085 delta: {
1086 let mut d = GSet::new();
1087 d.insert(1);
1088 d.insert(2);
1089 d
1090 },
1091 from_seq: 0,
1092 to_seq: 2,
1093 };
1094
1095 let result = r2.receive_interval(interval_1_3.clone());
1097 assert!(result.is_none()); assert!(!r2.state().contains(&3)); let result = r2.receive_interval(interval_0_2);
1102 assert!(result.is_some()); assert!(r2.state().contains(&1));
1104 assert!(r2.state().contains(&2));
1105
1106 assert!(r2.state().contains(&3));
1108 assert_eq!(r2.pending_count(), 0);
1109 }
1110
1111 #[test]
1112 fn test_durable_storage() {
1113 let mut storage: MemoryStorage<GSet<i32>> = MemoryStorage::new();
1114
1115 let mut replica: CausalReplica<GSet<i32>> = CausalReplica::new("test");
1116 replica.mutate(|_| {
1117 let mut d = GSet::new();
1118 d.insert(42);
1119 d
1120 });
1121
1122 storage.persist(replica.durable_state()).unwrap();
1124
1125 let loaded = storage.load("test").unwrap().unwrap();
1127 assert_eq!(loaded.counter, 1);
1128 assert!(loaded.state.contains(&42));
1129
1130 let recovered = CausalReplica::restore(loaded);
1132 assert!(recovered.state().contains(&42));
1133 }
1134}