mdcs_core/
mvreg.rs

1//! Multi-Value Register CRDT
2//!
3//! The Multi-Value Register (MV-Register) maintains a set of concurrent values
4//! instead of choosing a single winner. Each value is tagged with a unique
5//! identifier (dot) to distinguish different writes.
6//!
7//! When concurrent writes occur, the register contains all of them until
8//! one of them is explicitly observed and the others are discarded.
9
10use crate::lattice::Lattice;
11use serde::{Deserialize, Deserializer, Serialize, Serializer};
12use std::collections::BTreeMap;
13use ulid::Ulid;
14
15/// A unique identifier for a write operation
16#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
17pub struct Dot {
18    pub replica_id: String,
19    pub unique_id: Ulid,
20}
21
22impl Dot {
23    pub fn new(replica_id: impl Into<String>) -> Self {
24        Self {
25            replica_id: replica_id.into(),
26            unique_id: Ulid::new(),
27        }
28    }
29}
30
31/// A Multi-Value Register CRDT
32///
33/// Maintains a set of values, each with a unique dot. This allows
34/// concurrent writes to coexist until explicitly resolved.
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub struct MVRegister<T: Ord + Clone> {
37    /// Current values, each tagged with a unique dot
38    values: BTreeMap<Dot, T>,
39}
40
41// Custom serialization: serialize as Vec<(Dot, T)> for JSON compatibility
42impl<T: Ord + Clone + Serialize> Serialize for MVRegister<T> {
43    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
44    where
45        S: Serializer,
46    {
47        let entries: Vec<(&Dot, &T)> = self.values.iter().collect();
48        entries.serialize(serializer)
49    }
50}
51
52impl<'de, T: Ord + Clone + Deserialize<'de>> Deserialize<'de> for MVRegister<T> {
53    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
54    where
55        D: Deserializer<'de>,
56    {
57        let entries: Vec<(Dot, T)> = Vec::deserialize(deserializer)?;
58        Ok(Self {
59            values: entries.into_iter().collect(),
60        })
61    }
62}
63
64impl<T: Ord + Clone> MVRegister<T> {
65    /// Create a new empty Multi-Value Register
66    pub fn new() -> Self {
67        Self {
68            values: BTreeMap::new(),
69        }
70    }
71
72    /// Write a new value, generating a unique dot
73    pub fn write(&mut self, replica_id: &str, value: T) -> Dot {
74        let dot = Dot::new(replica_id);
75        // Clear previous values and insert the new one
76        self.values.clear();
77        self.values.insert(dot.clone(), value);
78        dot
79    }
80
81    /// Write a value with a specific dot (for merging)
82    pub fn write_with_dot(&mut self, dot: Dot, value: T) {
83        self.values.insert(dot, value);
84    }
85
86    /// Get all current values
87    pub fn read(&self) -> Vec<&T> {
88        self.values.values().collect()
89    }
90
91    /// Get all current values with their dots
92    pub fn read_with_dots(&self) -> Vec<(&Dot, &T)> {
93        self.values.iter().collect()
94    }
95
96    /// Resolve concurrent values by choosing one (for write-after-read consistency)
97    pub fn resolve(&mut self, replica_id: &str, value: T) -> Dot {
98        let dot = Dot::new(replica_id);
99        self.values.clear();
100        self.values.insert(dot.clone(), value);
101        dot
102    }
103
104    /// Remove a specific dot (value)
105    pub fn remove_dot(&mut self, dot: &Dot) {
106        self.values.remove(dot);
107    }
108
109    /// Check if register is empty
110    pub fn is_empty(&self) -> bool {
111        self.values.is_empty()
112    }
113
114    /// Get the number of concurrent values
115    pub fn len(&self) -> usize {
116        self.values.len()
117    }
118}
119
120impl<T: Ord + Clone> Default for MVRegister<T> {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl<T: Ord + Clone> Lattice for MVRegister<T> {
127    fn bottom() -> Self {
128        Self::new()
129    }
130
131    /// Join operation: union of all values from both registers
132    /// This represents the concurrent state after a merge
133    fn join(&self, other: &Self) -> Self {
134        let mut values = self.values.clone();
135
136        // Union all values from other with minimal cloning
137        for (dot, value) in other.values.iter() {
138            values.entry(dot.clone()).or_insert_with(|| value.clone());
139        }
140
141        Self { values }
142    }
143}
144
145#[cfg(test)]
146mod tests {
147    use super::*;
148
149    #[test]
150    fn test_mvreg_basic_write() {
151        let mut reg = MVRegister::new();
152
153        assert!(reg.is_empty());
154
155        let _dot1 = reg.write("replica1", 42);
156        assert_eq!(reg.len(), 1);
157        assert_eq!(reg.read(), vec![&42]);
158    }
159
160    #[test]
161    fn test_mvreg_concurrent_writes() {
162        let mut reg = MVRegister::new();
163
164        // First write
165        let _dot1 = reg.write("replica1", 10);
166        assert_eq!(reg.read(), vec![&10]);
167
168        // Concurrent write (should clear previous)
169        let _dot2 = reg.write("replica2", 20);
170        assert_eq!(reg.read(), vec![&20]);
171    }
172
173    #[test]
174    fn test_mvreg_merge_concurrent_values() {
175        let mut reg1 = MVRegister::new();
176        reg1.write("replica1", 10);
177
178        let mut reg2 = MVRegister::new();
179        reg2.write("replica2", 20);
180
181        // Merge should have both values
182        let merged = reg1.join(&reg2);
183        let values = merged.read();
184        assert_eq!(values.len(), 2);
185        assert!(values.contains(&&10));
186        assert!(values.contains(&&20));
187    }
188
189    #[test]
190    fn test_mvreg_resolve_conflicts() {
191        let mut reg = MVRegister::new();
192
193        // Multiple concurrent writes
194        reg.write("replica1", 10);
195        let mut reg2 = MVRegister::new();
196        reg2.write("replica2", 20);
197
198        let merged = reg.join(&reg2);
199        assert_eq!(merged.len(), 2);
200
201        // Resolve by choosing one
202        let mut resolved = merged.clone();
203        resolved.resolve("replica3", 30);
204        assert_eq!(resolved.len(), 1);
205        assert_eq!(resolved.read(), vec![&30]);
206    }
207
208    #[test]
209    fn test_mvreg_join_idempotent() {
210        let mut reg = MVRegister::new();
211        reg.write("replica1", 42);
212
213        let joined = reg.join(&reg);
214        assert_eq!(joined.len(), reg.len());
215        assert_eq!(joined.read(), reg.read());
216    }
217
218    #[test]
219    fn test_mvreg_join_commutative() {
220        let mut reg1 = MVRegister::new();
221        reg1.write("replica1", 10);
222
223        let mut reg2 = MVRegister::new();
224        reg2.write("replica2", 20);
225
226        let joined1 = reg1.join(&reg2);
227        let joined2 = reg2.join(&reg1);
228
229        assert_eq!(joined1.len(), joined2.len());
230
231        let mut v1 = joined1.read();
232        let mut v2 = joined2.read();
233        v1.sort();
234        v2.sort();
235        assert_eq!(v1, v2);
236    }
237
238    #[test]
239    fn test_mvreg_join_associative() {
240        let mut reg1 = MVRegister::new();
241        reg1.write("replica1", 10);
242
243        let mut reg2 = MVRegister::new();
244        reg2.write("replica2", 20);
245
246        let mut reg3 = MVRegister::new();
247        reg3.write("replica3", 30);
248
249        let left = reg1.join(&reg2).join(&reg3);
250        let right = reg1.join(&reg2.join(&reg3));
251
252        assert_eq!(left.len(), right.len());
253    }
254
255    #[test]
256    fn test_mvreg_bottom_is_identity() {
257        let mut reg = MVRegister::new();
258        reg.write("replica1", 42);
259
260        let bottom = MVRegister::bottom();
261        let joined = reg.join(&bottom);
262
263        assert_eq!(joined.len(), reg.len());
264        assert_eq!(joined.read(), reg.read());
265    }
266
267    #[test]
268    fn test_mvreg_serialization() {
269        let mut reg = MVRegister::new();
270        reg.write("replica1", 42);
271
272        let serialized = serde_json::to_string(&reg).unwrap();
273        let deserialized: MVRegister<i32> = serde_json::from_str(&serialized).unwrap();
274
275        assert_eq!(deserialized.read(), vec![&42]);
276    }
277}