mdcs_db/
rga_list.rs

1//! RGA List - Replicated Growable Array for ordered sequences.
2//!
3//! RGA provides a CRDT list that supports:
4//! - Insert at any position
5//! - Delete at any position
6//! - Move elements (delete + insert)
7//!
8//! Uses unique IDs to maintain consistent ordering across replicas.
9
10use mdcs_core::lattice::Lattice;
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use ulid::Ulid;
14
15/// Unique identifier for a list element.
16#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct ListId {
18    /// The replica that created this element.
19    pub replica: String,
20    /// Sequence number within that replica.
21    pub seq: u64,
22    /// Unique identifier for disambiguation.
23    pub ulid: Ulid,
24}
25
26impl ListId {
27    pub fn new(replica: impl Into<String>, seq: u64) -> Self {
28        Self {
29            replica: replica.into(),
30            seq,
31            ulid: Ulid::new(),
32        }
33    }
34
35    /// Create a genesis ID (for the virtual head).
36    pub fn genesis() -> Self {
37        Self {
38            replica: "".to_string(),
39            seq: 0,
40            ulid: Ulid::nil(),
41        }
42    }
43}
44
45impl PartialOrd for ListId {
46    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
47        Some(self.cmp(other))
48    }
49}
50
51impl Ord for ListId {
52    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
53        // Higher sequence = later in causal order
54        // Tie-break on replica ID, then ULID
55        self.seq
56            .cmp(&other.seq)
57            .then_with(|| self.replica.cmp(&other.replica))
58            .then_with(|| self.ulid.cmp(&other.ulid))
59    }
60}
61
62/// A node in the RGA list.
63#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
64pub struct ListNode<T> {
65    /// The unique ID of this node.
66    pub id: ListId,
67    /// The value stored (None if deleted - tombstone).
68    pub value: Option<T>,
69    /// The ID of the element this was inserted after.
70    pub origin: ListId,
71    /// Whether this node is deleted (tombstone).
72    pub deleted: bool,
73}
74
75impl<T> ListNode<T> {
76    pub fn new(id: ListId, value: T, origin: ListId) -> Self {
77        Self {
78            id,
79            value: Some(value),
80            origin,
81            deleted: false,
82        }
83    }
84}
85
86/// Delta for RGA list operations.
87#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
88pub struct RGAListDelta<T: Clone + PartialEq> {
89    /// Nodes to insert.
90    pub inserts: Vec<ListNode<T>>,
91    /// IDs of nodes to delete.
92    pub deletes: Vec<ListId>,
93}
94
95impl<T: Clone + PartialEq> RGAListDelta<T> {
96    pub fn new() -> Self {
97        Self {
98            inserts: Vec::new(),
99            deletes: Vec::new(),
100        }
101    }
102
103    pub fn is_empty(&self) -> bool {
104        self.inserts.is_empty() && self.deletes.is_empty()
105    }
106}
107
108impl<T: Clone + PartialEq> Default for RGAListDelta<T> {
109    fn default() -> Self {
110        Self::new()
111    }
112}
113
114/// Replicated Growable Array - an ordered list CRDT.
115///
116/// Supports insert, delete, and move operations with
117/// deterministic conflict resolution.
118#[derive(Clone, Debug, Serialize, Deserialize)]
119pub struct RGAList<T: Clone + PartialEq> {
120    /// All nodes indexed by their ID.
121    nodes: HashMap<ListId, ListNode<T>>,
122    /// Children of each node (for ordering).
123    /// Maps origin -> list of children sorted by ID.
124    children: HashMap<ListId, Vec<ListId>>,
125    /// The replica ID for this instance.
126    replica_id: String,
127    /// Sequence counter for generating IDs.
128    seq: u64,
129    /// Pending delta for replication.
130    #[serde(skip)]
131    pending_delta: Option<RGAListDelta<T>>,
132}
133
134impl<T: Clone + PartialEq> RGAList<T> {
135    /// Create a new empty RGA list.
136    pub fn new(replica_id: impl Into<String>) -> Self {
137        let replica_id = replica_id.into();
138        let mut list = Self {
139            nodes: HashMap::new(),
140            children: HashMap::new(),
141            replica_id,
142            seq: 0,
143            pending_delta: None,
144        };
145
146        // Insert virtual head node
147        let genesis = ListId::genesis();
148        list.children.insert(genesis, Vec::new());
149
150        list
151    }
152
153    /// Get the replica ID.
154    pub fn replica_id(&self) -> &str {
155        &self.replica_id
156    }
157
158    /// Generate a new unique ID.
159    fn next_id(&mut self) -> ListId {
160        self.seq += 1;
161        ListId::new(&self.replica_id, self.seq)
162    }
163
164    /// Insert a value at the given index.
165    pub fn insert(&mut self, index: usize, value: T) {
166        let origin = self
167            .id_at_index(index.saturating_sub(1))
168            .unwrap_or(ListId::genesis());
169        self.insert_after(&origin, value);
170    }
171
172    /// Insert a value after the given origin ID.
173    pub fn insert_after(&mut self, origin: &ListId, value: T) {
174        let id = self.next_id();
175        let node = ListNode::new(id.clone(), value, origin.clone());
176
177        self.integrate_node(node.clone());
178
179        // Record delta
180        let delta = self.pending_delta.get_or_insert_with(RGAListDelta::new);
181        delta.inserts.push(node);
182    }
183
184    /// Insert at the beginning.
185    pub fn push_front(&mut self, value: T) {
186        self.insert(0, value);
187    }
188
189    /// Insert at the end.
190    pub fn push_back(&mut self, value: T) {
191        let len = self.len();
192        self.insert(len, value);
193    }
194
195    /// Delete the element at the given index.
196    pub fn delete(&mut self, index: usize) -> Option<T> {
197        let id = self.id_at_index(index)?;
198        self.delete_by_id(&id)
199    }
200
201    /// Delete an element by its ID.
202    pub fn delete_by_id(&mut self, id: &ListId) -> Option<T> {
203        if let Some(node) = self.nodes.get_mut(id) {
204            if !node.deleted {
205                node.deleted = true;
206                let value = node.value.take();
207
208                // Record delta
209                let delta = self.pending_delta.get_or_insert_with(RGAListDelta::new);
210                delta.deletes.push(id.clone());
211
212                return value;
213            }
214        }
215        None
216    }
217
218    /// Move an element from one index to another.
219    pub fn move_element(&mut self, from: usize, to: usize) -> bool {
220        if let Some(value) = self.delete(from) {
221            // Adjust target index if moving forward
222            let adjusted_to = if to > from { to - 1 } else { to };
223            self.insert(adjusted_to, value);
224            true
225        } else {
226            false
227        }
228    }
229
230    /// Get the element at the given index.
231    pub fn get(&self, index: usize) -> Option<&T> {
232        let id = self.id_at_index(index)?;
233        self.nodes.get(&id).and_then(|n| n.value.as_ref())
234    }
235
236    /// Get a mutable reference to the element at the given index.
237    pub fn get_mut(&mut self, index: usize) -> Option<&mut T> {
238        let id = self.id_at_index(index)?;
239        self.nodes.get_mut(&id).and_then(|n| n.value.as_mut())
240    }
241
242    /// Get the number of non-deleted elements.
243    pub fn len(&self) -> usize {
244        self.nodes.values().filter(|n| !n.deleted).count()
245    }
246
247    /// Check if the list is empty.
248    pub fn is_empty(&self) -> bool {
249        self.len() == 0
250    }
251
252    /// Iterate over values in order.
253    pub fn iter(&self) -> impl Iterator<Item = &T> {
254        self.iter_nodes()
255            .filter(|n| !n.deleted)
256            .filter_map(|n| n.value.as_ref())
257    }
258
259    /// Iterate over (index, value) pairs.
260    pub fn iter_indexed(&self) -> impl Iterator<Item = (usize, &T)> {
261        self.iter().enumerate()
262    }
263
264    /// Convert to a Vec.
265    pub fn to_vec(&self) -> Vec<T> {
266        self.iter().cloned().collect()
267    }
268
269    /// Get the ID at a given visible index.
270    fn id_at_index(&self, index: usize) -> Option<ListId> {
271        self.iter_nodes()
272            .filter(|n| !n.deleted)
273            .nth(index)
274            .map(|n| n.id.clone())
275    }
276
277    /// Get the visible index for an ID.
278    pub fn index_of_id(&self, id: &ListId) -> Option<usize> {
279        self.iter_nodes()
280            .filter(|n| !n.deleted)
281            .position(|n| &n.id == id)
282    }
283
284    /// Iterate over all nodes in order (including tombstones).
285    fn iter_nodes(&self) -> impl Iterator<Item = &ListNode<T>> {
286        RGAIterator {
287            list: self,
288            stack: vec![ListId::genesis()],
289            visited: std::collections::HashSet::new(),
290        }
291    }
292
293    /// Integrate a node into the list.
294    fn integrate_node(&mut self, node: ListNode<T>) {
295        let id = node.id.clone();
296        let origin = node.origin.clone();
297
298        // Add to nodes map
299        self.nodes.insert(id.clone(), node);
300
301        // Add to children of origin, maintaining sort order
302        let children = self.children.entry(origin).or_default();
303
304        // Find insertion position (maintain descending order by ID for RGA)
305        let pos = children
306            .iter()
307            .position(|c| c < &id)
308            .unwrap_or(children.len());
309        children.insert(pos, id.clone());
310
311        // Ensure this node has a children entry (reuse cloned id)
312        self.children.entry(id).or_default();
313    }
314
315    /// Take the pending delta.
316    pub fn take_delta(&mut self) -> Option<RGAListDelta<T>> {
317        self.pending_delta.take()
318    }
319
320    /// Apply a delta from another replica.
321    pub fn apply_delta(&mut self, delta: &RGAListDelta<T>) {
322        // Estimate how many entries are truly new before reserving.
323        // This avoids allocating proportional to raw delta size when inserts are duplicates.
324        let mut new_nodes = 0usize;
325        let mut new_origins = 0usize;
326
327        for node in &delta.inserts {
328            if !self.nodes.contains_key(&node.id) {
329                new_nodes += 1;
330                if !self.children.contains_key(&node.origin) {
331                    new_origins += 1;
332                }
333            }
334        }
335
336        if new_nodes > 0 {
337            self.nodes.reserve(new_nodes);
338        }
339        if new_origins > 0 {
340            self.children.reserve(new_origins);
341        }
342
343        // Apply inserts
344        for node in &delta.inserts {
345            if !self.nodes.contains_key(&node.id) {
346                self.integrate_node(node.clone());
347            }
348        }
349
350        // Apply deletes
351        for id in &delta.deletes {
352            if let Some(node) = self.nodes.get_mut(id) {
353                node.deleted = true;
354                node.value = None;
355            }
356        }
357    }
358}
359
360/// Iterator for traversing the RGA list in order.
361struct RGAIterator<'a, T: Clone + PartialEq> {
362    list: &'a RGAList<T>,
363    stack: Vec<ListId>,
364    visited: std::collections::HashSet<ListId>,
365}
366
367impl<'a, T: Clone + PartialEq> Iterator for RGAIterator<'a, T> {
368    type Item = &'a ListNode<T>;
369
370    fn next(&mut self) -> Option<Self::Item> {
371        while let Some(id) = self.stack.pop() {
372            if self.visited.contains(&id) {
373                continue;
374            }
375            self.visited.insert(id.clone());
376
377            // Push children in reverse order (so first child is processed first)
378            if let Some(children) = self.list.children.get(&id) {
379                for child in children.iter().rev() {
380                    if !self.visited.contains(child) {
381                        self.stack.push(child.clone());
382                    }
383                }
384            }
385
386            // Return the node (skip genesis)
387            if id != ListId::genesis() {
388                if let Some(node) = self.list.nodes.get(&id) {
389                    return Some(node);
390                }
391            }
392        }
393        None
394    }
395}
396
397impl<T: Clone + PartialEq> PartialEq for RGAList<T> {
398    fn eq(&self, other: &Self) -> bool {
399        // Compare visible content
400        self.to_vec() == other.to_vec()
401    }
402}
403
404impl<T: Clone + PartialEq> Lattice for RGAList<T> {
405    fn bottom() -> Self {
406        Self::new("")
407    }
408
409    fn join(&self, other: &Self) -> Self {
410        let mut result = self.clone();
411
412        // Merge all nodes from other
413        for (id, node) in &other.nodes {
414            if let Some(existing) = result.nodes.get_mut(id) {
415                // If deleted in either, mark as deleted
416                if node.deleted {
417                    existing.deleted = true;
418                    existing.value = None;
419                }
420            } else {
421                // Add new node
422                result.integrate_node(node.clone());
423            }
424        }
425
426        result
427    }
428}
429
430impl<T: Clone + PartialEq> Default for RGAList<T> {
431    fn default() -> Self {
432        Self::new("")
433    }
434}
435
436#[cfg(test)]
437mod tests {
438    use super::*;
439
440    #[test]
441    fn test_basic_operations() {
442        let mut list: RGAList<String> = RGAList::new("r1");
443
444        list.push_back("a".to_string());
445        list.push_back("b".to_string());
446        list.push_back("c".to_string());
447
448        assert_eq!(list.len(), 3);
449        assert_eq!(list.get(0), Some(&"a".to_string()));
450        assert_eq!(list.get(1), Some(&"b".to_string()));
451        assert_eq!(list.get(2), Some(&"c".to_string()));
452    }
453
454    #[test]
455    fn test_insert_at_index() {
456        let mut list: RGAList<i32> = RGAList::new("r1");
457
458        list.push_back(1);
459        list.push_back(3);
460        list.insert(1, 2);
461
462        assert_eq!(list.to_vec(), vec![1, 2, 3]);
463    }
464
465    #[test]
466    fn test_delete() {
467        let mut list: RGAList<i32> = RGAList::new("r1");
468
469        list.push_back(1);
470        list.push_back(2);
471        list.push_back(3);
472
473        let deleted = list.delete(1);
474        assert_eq!(deleted, Some(2));
475        assert_eq!(list.to_vec(), vec![1, 3]);
476    }
477
478    #[test]
479    fn test_concurrent_inserts() {
480        let mut list1: RGAList<&str> = RGAList::new("r1");
481        let mut list2: RGAList<&str> = RGAList::new("r2");
482
483        // Both start with "a"
484        list1.push_back("a");
485        list2.apply_delta(&list1.take_delta().unwrap());
486
487        // Concurrent inserts after "a"
488        list1.push_back("b"); // r1 inserts "b"
489        list2.push_back("c"); // r2 inserts "c"
490
491        // Exchange deltas
492        let delta1 = list1.take_delta().unwrap();
493        let delta2 = list2.take_delta().unwrap();
494
495        list1.apply_delta(&delta2);
496        list2.apply_delta(&delta1);
497
498        // Should converge to same order
499        assert_eq!(list1.to_vec(), list2.to_vec());
500    }
501
502    #[test]
503    fn test_move_element() {
504        let mut list: RGAList<i32> = RGAList::new("r1");
505
506        list.push_back(1);
507        list.push_back(2);
508        list.push_back(3);
509
510        list.move_element(0, 2);
511        assert_eq!(list.to_vec(), vec![2, 1, 3]);
512    }
513
514    #[test]
515    fn test_lattice_join() {
516        let mut list1: RGAList<i32> = RGAList::new("r1");
517        let mut list2: RGAList<i32> = RGAList::new("r2");
518
519        list1.push_back(1);
520        list2.push_back(2);
521
522        let merged = list1.join(&list2);
523
524        // Both elements should be present
525        assert_eq!(merged.len(), 2);
526    }
527
528    #[test]
529    fn test_iter() {
530        let mut list: RGAList<i32> = RGAList::new("r1");
531
532        list.push_back(1);
533        list.push_back(2);
534        list.push_back(3);
535
536        let collected: Vec<_> = list.iter().cloned().collect();
537        assert_eq!(collected, vec![1, 2, 3]);
538    }
539}