mdcs_core/
orset.rs

1//!  Observed-Remove Set (OR-Set / Add-Wins Set)
2//!
3//! Each add generates a unique tag.  Remove only removes currently observed tags.
4//!  Concurrent add and remove of the same element:  add wins.
5
6use crate::lattice::{DeltaCRDT, Lattice};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use ulid::Ulid;
10
11/// A unique tag for each add operation
12#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
13pub struct Tag {
14    /// The replica that created this tag
15    pub replica_id: String,
16    /// Unique identifier for this specific add
17    pub unique_id: Ulid,
18}
19
20impl Tag {
21    pub fn new(replica_id: impl Into<String>) -> Self {
22        Self {
23            replica_id: replica_id.into(),
24            unique_id: Ulid::new(),
25        }
26    }
27}
28
29/// An Observed-Remove Set (OR-Set) CRDT with add-wins semantics.
30///
31/// Each insertion is tagged with a globally unique [`Tag`]. A remove operation
32/// only removes the tags that were *observed* at the time of removal. This means
33/// a concurrent add and remove results in the element being present (add wins).
34///
35/// Supports delta-state replication via the [`DeltaCRDT`] trait.
36#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
37pub struct ORSet<T: Ord + Clone> {
38    /// Maps elements to their active tags
39    entries: BTreeMap<T, BTreeSet<Tag>>,
40    /// Tombstones:  tags that have been removed
41    /// (Required for distributed consistency)
42    tombstones: BTreeSet<Tag>,
43    /// Pending delta for delta-state replication
44    #[serde(skip)]
45    pending_delta: Option<ORSetDelta<T>>,
46}
47
48/// Delta payload for [`ORSet`] replication.
49#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ORSetDelta<T: Ord + Clone> {
51    /// New element additions with their tags.
52    pub additions: BTreeMap<T, BTreeSet<Tag>>,
53    /// Tags that have been removed.
54    pub removals: BTreeSet<Tag>,
55}
56
57impl<T: Ord + Clone> ORSet<T> {
58    /// Create a new empty OR-Set.
59    pub fn new() -> Self {
60        Self {
61            entries: BTreeMap::new(),
62            tombstones: BTreeSet::new(),
63            pending_delta: None,
64        }
65    }
66
67    /// Add an element with a new unique tag
68    pub fn add(&mut self, replica_id: &str, value: T) {
69        let tag = Tag::new(replica_id);
70
71        self.entries
72            .entry(value.clone())
73            .or_default()
74            .insert(tag.clone());
75
76        // Record in delta
77        let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
78            additions: BTreeMap::new(),
79            removals: BTreeSet::new(),
80        });
81        delta.additions.entry(value).or_default().insert(tag);
82    }
83
84    /// Remove all observed instances of an element
85    pub fn remove(&mut self, value: &T) {
86        if let Some(tags) = self.entries.remove(value) {
87            // Move tags to tombstones
88            for tag in tags.iter() {
89                self.tombstones.insert(tag.clone());
90            }
91
92            // Record in delta
93            let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
94                additions: BTreeMap::new(),
95                removals: BTreeSet::new(),
96            });
97            delta.removals.extend(tags);
98        }
99    }
100
101    /// Check whether `value` is present in the set (has at least one live tag).
102    pub fn contains(&self, value: &T) -> bool {
103        self.entries.get(value).is_some_and(|tags| !tags.is_empty())
104    }
105
106    /// Iterate over all elements currently in the set.
107    pub fn iter(&self) -> impl Iterator<Item = &T> {
108        self.entries.keys()
109    }
110
111    /// Return the number of distinct elements in the set.
112    pub fn len(&self) -> usize {
113        self.entries.len()
114    }
115
116    /// Return `true` if the set contains no elements.
117    pub fn is_empty(&self) -> bool {
118        self.entries.is_empty()
119    }
120}
121
122impl<T: Ord + Clone> Default for ORSet<T> {
123    fn default() -> Self {
124        Self::new()
125    }
126}
127
128impl<T: Ord + Clone> Lattice for ORSet<T> {
129    fn bottom() -> Self {
130        Self::new()
131    }
132
133    fn join(&self, other: &Self) -> Self {
134        let mut result = Self::new();
135
136        // Merge tombstones first
137        result.tombstones = self.tombstones.union(&other.tombstones).cloned().collect();
138
139        // Merge entries, filtering out tombstoned tags
140        let all_keys: BTreeSet<_> = self
141            .entries
142            .keys()
143            .chain(other.entries.keys())
144            .cloned()
145            .collect();
146
147        for key in all_keys {
148            let self_tags = self.entries.get(&key).cloned().unwrap_or_default();
149            let other_tags = other.entries.get(&key).cloned().unwrap_or_default();
150
151            let merged_tags: BTreeSet<Tag> = self_tags
152                .union(&other_tags)
153                .filter(|tag| !result.tombstones.contains(tag))
154                .cloned()
155                .collect();
156
157            if !merged_tags.is_empty() {
158                result.entries.insert(key, merged_tags);
159            }
160        }
161
162        result
163    }
164}
165
166impl<T: Ord + Clone> Lattice for ORSetDelta<T> {
167    fn bottom() -> Self {
168        Self {
169            additions: BTreeMap::new(),
170            removals: BTreeSet::new(),
171        }
172    }
173
174    fn join(&self, other: &Self) -> Self {
175        let mut additions = self.additions.clone();
176        for (k, v) in &other.additions {
177            additions
178                .entry(k.clone())
179                .or_default()
180                .extend(v.iter().cloned());
181        }
182
183        // Build removals by merging sets directly
184        let mut removals = self.removals.clone();
185        removals.extend(other.removals.iter().cloned());
186
187        Self {
188            additions,
189            removals,
190        }
191    }
192}
193
194impl<T: Ord + Clone> DeltaCRDT for ORSet<T> {
195    type Delta = ORSetDelta<T>;
196
197    fn split_delta(&mut self) -> Option<Self::Delta> {
198        self.pending_delta.take()
199    }
200
201    fn apply_delta(&mut self, delta: &Self::Delta) {
202        // Apply removals to tombstones
203        self.tombstones.extend(delta.removals.iter().cloned());
204
205        // Apply additions, filtering tombstones
206        for (value, tags) in &delta.additions {
207            let entry = self.entries.entry(value.clone()).or_default();
208            for tag in tags {
209                if !self.tombstones.contains(tag) {
210                    entry.insert(tag.clone());
211                }
212            }
213        }
214
215        // Clean up empty entries
216        self.entries.retain(|_, tags| !tags.is_empty());
217    }
218}