mdcs_core/
map.rs

1//! Map CRDT - A composable container for nested CRDTs
2//!
3//! The Map CRDT allows mapping keys to other CRDT values, enabling
4//! the construction of complex nested data structures like JSON documents.
5//!
6//! Key design: A single shared causal context ensures that causality is
7//! tracked consistently across the entire map and all nested CRDTs.
8
9use crate::lattice::Lattice;
10use serde::{Deserialize, Deserializer, Serialize, Serializer};
11use std::collections::BTreeMap;
12
13/// A unique identifier for a write operation (dot)
14/// Tracks which replica created this value and when
15#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
16pub struct Dot {
17    pub replica_id: String,
18    pub seq: u64,
19}
20
21impl Dot {
22    pub fn new(replica_id: impl Into<String>, seq: u64) -> Self {
23        Self {
24            replica_id: replica_id.into(),
25            seq,
26        }
27    }
28}
29
30/// Causal context: tracks all known events for consistent removal
31#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
32pub struct CausalContext {
33    /// Set of all dots that have been created
34    dots: std::collections::BTreeSet<Dot>,
35}
36
37impl CausalContext {
38    pub fn new() -> Self {
39        Self {
40            dots: std::collections::BTreeSet::new(),
41        }
42    }
43
44    pub fn add_dot(&mut self, dot: Dot) {
45        self.dots.insert(dot);
46    }
47
48    pub fn contains(&self, dot: &Dot) -> bool {
49        self.dots.contains(dot)
50    }
51
52    pub fn join(&self, other: &CausalContext) -> CausalContext {
53        let mut joined = self.clone();
54        for dot in &other.dots {
55            joined.add_dot(dot.clone());
56        }
57        joined
58    }
59}
60
61impl Default for CausalContext {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67/// A generic value that can be stored in the map
68/// This enables composing different CRDT types
69#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
70pub enum MapValue {
71    Int(i64),
72    Text(String),
73    Bytes(Vec<u8>),
74    // For nested maps: Box<CRDTMap>
75    // For other CRDTs: Box<dyn Lattice>
76}
77
78/// Map CRDT - composable container for nested CRDTs
79///
80/// Maps keys to values, each value is tagged with a dot.
81/// A value is "live" if its dot is in the context.
82/// A value is "removed" if its dot is in the context but not in the store.
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub struct CRDTMap<K: Ord + Clone> {
85    /// Maps keys to dots that have been written to this key
86    entries: BTreeMap<K, BTreeMap<Dot, MapValue>>,
87    /// Shared causal context: all dots that have been created or seen
88    context: CausalContext,
89    /// Sequence number for generating dots on this replica
90    local_seq: u64,
91}
92
93// Custom serialization for CRDTMap to handle nested BTreeMap with Dot keys
94impl<K: Ord + Clone + Serialize> Serialize for CRDTMap<K> {
95    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
96    where
97        S: Serializer,
98    {
99        // Convert entries to a serializable format
100        #[derive(Serialize)]
101        struct SerializableCRDTMap<'a, K: Ord + Clone + Serialize> {
102            entries: Vec<(&'a K, Vec<(&'a Dot, &'a MapValue)>)>,
103            context: &'a CausalContext,
104        }
105
106        let entries: Vec<_> = self
107            .entries
108            .iter()
109            .map(|(k, v)| (k, v.iter().collect::<Vec<_>>()))
110            .collect();
111
112        let serializable = SerializableCRDTMap {
113            entries,
114            context: &self.context,
115        };
116
117        serializable.serialize(serializer)
118    }
119}
120
121impl<'de, K: Ord + Clone + Deserialize<'de>> Deserialize<'de> for CRDTMap<K> {
122    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
123    where
124        D: Deserializer<'de>,
125    {
126        #[derive(Deserialize)]
127        struct DeserializableCRDTMap<K: Ord + Clone> {
128            entries: Vec<(K, Vec<(Dot, MapValue)>)>,
129            context: CausalContext,
130        }
131
132        let deserialized = DeserializableCRDTMap::<K>::deserialize(deserializer)?;
133
134        let entries: BTreeMap<K, BTreeMap<Dot, MapValue>> = deserialized
135            .entries
136            .into_iter()
137            .map(|(k, v)| (k, v.into_iter().collect()))
138            .collect();
139
140        Ok(Self {
141            entries,
142            context: deserialized.context,
143            local_seq: 0,
144        })
145    }
146}
147
148impl<K: Ord + Clone> CRDTMap<K> {
149    /// Create a new empty map
150    pub fn new() -> Self {
151        Self {
152            entries: BTreeMap::new(),
153            context: CausalContext::new(),
154            local_seq: 0,
155        }
156    }
157
158    /// Put a value at a key (from this replica)
159    pub fn put(&mut self, replica_id: &str, key: K, value: MapValue) -> Dot {
160        let dot = Dot::new(replica_id, self.local_seq);
161        self.local_seq += 1;
162
163        // Create entry for this key if it doesn't exist
164        let entry = self.entries.entry(key).or_default();
165
166        // Clear previous values for this key and insert new one
167        entry.clear();
168        entry.insert(dot.clone(), value);
169
170        // Track dot in causal context
171        self.context.add_dot(dot.clone());
172
173        dot
174    }
175
176    /// Get the current value at a key
177    /// Returns the value if the key exists and has live entries
178    pub fn get(&self, key: &K) -> Option<&MapValue> {
179        self.entries
180            .get(key)
181            .and_then(|entry| entry.values().next())
182    }
183
184    /// Get all values at a key (for concurrent writes)
185    pub fn get_all(&self, key: &K) -> Vec<&MapValue> {
186        self.entries
187            .get(key)
188            .map(|entry| entry.values().collect())
189            .unwrap_or_default()
190    }
191
192    /// Remove a key by recording all its current dots as removed
193    pub fn remove(&mut self, key: &K) {
194        if let Some(entry) = self.entries.get_mut(key) {
195            // Mark all dots as removed by clearing them but keeping them in context
196            entry.clear();
197        }
198    }
199
200    /// Check if a key exists with live values
201    pub fn contains_key(&self, key: &K) -> bool {
202        self.entries
203            .get(key)
204            .map(|entry| !entry.is_empty())
205            .unwrap_or(false)
206    }
207
208    /// Get all keys that have live values
209    pub fn keys(&self) -> impl Iterator<Item = &K> {
210        self.entries
211            .iter()
212            .filter_map(|(k, v)| if !v.is_empty() { Some(k) } else { None })
213    }
214
215    /// Get the causal context
216    pub fn context(&self) -> &CausalContext {
217        &self.context
218    }
219
220    /// Add a value with a specific dot (for merging)
221    pub fn put_with_dot(&mut self, key: K, dot: Dot, value: MapValue) {
222        let entry = self.entries.entry(key).or_default();
223        entry.insert(dot.clone(), value);
224        self.context.add_dot(dot);
225    }
226}
227
228impl<K: Ord + Clone> Default for CRDTMap<K> {
229    fn default() -> Self {
230        Self::new()
231    }
232}
233
234impl<K: Ord + Clone> Lattice for CRDTMap<K> {
235    fn bottom() -> Self {
236        Self::new()
237    }
238
239    /// Join operation: merge all entries and contexts
240    /// For each key, union all the dots and their values
241    fn join(&self, other: &Self) -> Self {
242        let mut entries = self.entries.clone();
243
244        // Merge other's entries with minimal nested cloning
245        for (key, other_entry) in &other.entries {
246            let entry = entries.entry(key.clone()).or_default();
247            for (dot, value) in other_entry.iter() {
248                entry.insert(dot.clone(), value.clone());
249            }
250        }
251
252        Self {
253            entries,
254            context: self.context.join(&other.context),
255            local_seq: self.local_seq.max(other.local_seq),
256        }
257    }
258}
259
260#[cfg(test)]
261mod tests {
262    use super::*;
263
264    #[test]
265    fn test_map_basic_operations() {
266        let mut map: CRDTMap<String> = CRDTMap::new();
267
268        map.put("replica1", "key1".to_string(), MapValue::Int(42));
269        assert_eq!(map.get(&"key1".to_string()), Some(&MapValue::Int(42)));
270
271        map.put(
272            "replica1",
273            "key2".to_string(),
274            MapValue::Text("hello".to_string()),
275        );
276        assert_eq!(
277            map.get(&"key2".to_string()),
278            Some(&MapValue::Text("hello".to_string()))
279        );
280    }
281
282    #[test]
283    fn test_map_remove() {
284        let mut map: CRDTMap<String> = CRDTMap::new();
285
286        map.put("replica1", "key1".to_string(), MapValue::Int(42));
287        assert!(map.contains_key(&"key1".to_string()));
288
289        map.remove(&"key1".to_string());
290        assert!(!map.contains_key(&"key1".to_string()));
291    }
292
293    #[test]
294    fn test_map_join_idempotent() {
295        let mut map1: CRDTMap<String> = CRDTMap::new();
296        map1.put("replica1", "key1".to_string(), MapValue::Int(42));
297
298        let joined = map1.join(&map1);
299        assert_eq!(joined.get(&"key1".to_string()), Some(&MapValue::Int(42)));
300    }
301
302    #[test]
303    fn test_map_join_commutative() {
304        let mut map1: CRDTMap<String> = CRDTMap::new();
305        map1.put("replica1", "key1".to_string(), MapValue::Int(42));
306
307        let mut map2: CRDTMap<String> = CRDTMap::new();
308        map2.put(
309            "replica2",
310            "key2".to_string(),
311            MapValue::Text("world".to_string()),
312        );
313
314        let joined1 = map1.join(&map2);
315        let joined2 = map2.join(&map1);
316
317        assert_eq!(joined1.get(&"key1".to_string()), Some(&MapValue::Int(42)));
318        assert_eq!(
319            joined1.get(&"key2".to_string()),
320            Some(&MapValue::Text("world".to_string()))
321        );
322
323        assert_eq!(joined2.get(&"key1".to_string()), Some(&MapValue::Int(42)));
324        assert_eq!(
325            joined2.get(&"key2".to_string()),
326            Some(&MapValue::Text("world".to_string()))
327        );
328    }
329
330    #[test]
331    fn test_map_join_associative() {
332        let mut map1: CRDTMap<String> = CRDTMap::new();
333        map1.put("replica1", "key1".to_string(), MapValue::Int(1));
334
335        let mut map2: CRDTMap<String> = CRDTMap::new();
336        map2.put("replica2", "key2".to_string(), MapValue::Int(2));
337
338        let mut map3: CRDTMap<String> = CRDTMap::new();
339        map3.put("replica3", "key3".to_string(), MapValue::Int(3));
340
341        let left = map1.join(&map2).join(&map3);
342        let right = map1.join(&map2.join(&map3));
343
344        assert_eq!(left.get(&"key1".to_string()), Some(&MapValue::Int(1)));
345        assert_eq!(left.get(&"key2".to_string()), Some(&MapValue::Int(2)));
346        assert_eq!(left.get(&"key3".to_string()), Some(&MapValue::Int(3)));
347
348        assert_eq!(right.get(&"key1".to_string()), Some(&MapValue::Int(1)));
349        assert_eq!(right.get(&"key2".to_string()), Some(&MapValue::Int(2)));
350        assert_eq!(right.get(&"key3".to_string()), Some(&MapValue::Int(3)));
351    }
352
353    #[test]
354    fn test_map_concurrent_writes_different_keys() {
355        let mut map1: CRDTMap<String> = CRDTMap::new();
356        map1.put("replica1", "key1".to_string(), MapValue::Int(10));
357
358        let mut map2: CRDTMap<String> = CRDTMap::new();
359        map2.put("replica2", "key2".to_string(), MapValue::Int(20));
360
361        let merged = map1.join(&map2);
362        assert_eq!(merged.get(&"key1".to_string()), Some(&MapValue::Int(10)));
363        assert_eq!(merged.get(&"key2".to_string()), Some(&MapValue::Int(20)));
364    }
365
366    #[test]
367    fn test_map_serialization() {
368        let mut map: CRDTMap<String> = CRDTMap::new();
369        map.put("replica1", "key1".to_string(), MapValue::Int(42));
370        map.put(
371            "replica1",
372            "key2".to_string(),
373            MapValue::Text("hello".to_string()),
374        );
375
376        let serialized = serde_json::to_string(&map).unwrap();
377        let deserialized: CRDTMap<String> = serde_json::from_str(&serialized).unwrap();
378
379        assert_eq!(
380            deserialized.get(&"key1".to_string()),
381            Some(&MapValue::Int(42))
382        );
383        assert_eq!(
384            deserialized.get(&"key2".to_string()),
385            Some(&MapValue::Text("hello".to_string()))
386        );
387    }
388}