1use crate::lattice::{DeltaCRDT, Lattice};
7use serde::{Deserialize, Serialize};
8use std::collections::{BTreeMap, BTreeSet};
9use ulid::Ulid;
10
11#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
13pub struct Tag {
14 pub replica_id: String,
16 pub unique_id: Ulid,
18}
19
20impl Tag {
21 pub fn new(replica_id: impl Into<String>) -> Self {
22 Self {
23 replica_id: replica_id.into(),
24 unique_id: Ulid::new(),
25 }
26 }
27}
28
29#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
37pub struct ORSet<T: Ord + Clone> {
38 entries: BTreeMap<T, BTreeSet<Tag>>,
40 tombstones: BTreeSet<Tag>,
43 #[serde(skip)]
45 pending_delta: Option<ORSetDelta<T>>,
46}
47
48#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
50pub struct ORSetDelta<T: Ord + Clone> {
51 pub additions: BTreeMap<T, BTreeSet<Tag>>,
53 pub removals: BTreeSet<Tag>,
55}
56
57impl<T: Ord + Clone> ORSet<T> {
58 pub fn new() -> Self {
60 Self {
61 entries: BTreeMap::new(),
62 tombstones: BTreeSet::new(),
63 pending_delta: None,
64 }
65 }
66
67 pub fn add(&mut self, replica_id: &str, value: T) {
69 let tag = Tag::new(replica_id);
70
71 self.entries
72 .entry(value.clone())
73 .or_default()
74 .insert(tag.clone());
75
76 let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
78 additions: BTreeMap::new(),
79 removals: BTreeSet::new(),
80 });
81 delta.additions.entry(value).or_default().insert(tag);
82 }
83
84 pub fn remove(&mut self, value: &T) {
86 if let Some(tags) = self.entries.remove(value) {
87 for tag in tags.iter() {
89 self.tombstones.insert(tag.clone());
90 }
91
92 let delta = self.pending_delta.get_or_insert_with(|| ORSetDelta {
94 additions: BTreeMap::new(),
95 removals: BTreeSet::new(),
96 });
97 delta.removals.extend(tags);
98 }
99 }
100
101 pub fn contains(&self, value: &T) -> bool {
103 self.entries.get(value).is_some_and(|tags| !tags.is_empty())
104 }
105
106 pub fn iter(&self) -> impl Iterator<Item = &T> {
108 self.entries.keys()
109 }
110
111 pub fn len(&self) -> usize {
113 self.entries.len()
114 }
115
116 pub fn is_empty(&self) -> bool {
118 self.entries.is_empty()
119 }
120}
121
122impl<T: Ord + Clone> Default for ORSet<T> {
123 fn default() -> Self {
124 Self::new()
125 }
126}
127
128impl<T: Ord + Clone> Lattice for ORSet<T> {
129 fn bottom() -> Self {
130 Self::new()
131 }
132
133 fn join(&self, other: &Self) -> Self {
134 let mut result = Self::new();
135
136 result.tombstones = self.tombstones.union(&other.tombstones).cloned().collect();
138
139 let all_keys: BTreeSet<_> = self
141 .entries
142 .keys()
143 .chain(other.entries.keys())
144 .cloned()
145 .collect();
146
147 for key in all_keys {
148 let self_tags = self.entries.get(&key).cloned().unwrap_or_default();
149 let other_tags = other.entries.get(&key).cloned().unwrap_or_default();
150
151 let merged_tags: BTreeSet<Tag> = self_tags
152 .union(&other_tags)
153 .filter(|tag| !result.tombstones.contains(tag))
154 .cloned()
155 .collect();
156
157 if !merged_tags.is_empty() {
158 result.entries.insert(key, merged_tags);
159 }
160 }
161
162 result
163 }
164}
165
166impl<T: Ord + Clone> Lattice for ORSetDelta<T> {
167 fn bottom() -> Self {
168 Self {
169 additions: BTreeMap::new(),
170 removals: BTreeSet::new(),
171 }
172 }
173
174 fn join(&self, other: &Self) -> Self {
175 let mut additions = self.additions.clone();
176 for (k, v) in &other.additions {
177 additions
178 .entry(k.clone())
179 .or_default()
180 .extend(v.iter().cloned());
181 }
182
183 let mut removals = self.removals.clone();
185 removals.extend(other.removals.iter().cloned());
186
187 Self {
188 additions,
189 removals,
190 }
191 }
192}
193
194impl<T: Ord + Clone> DeltaCRDT for ORSet<T> {
195 type Delta = ORSetDelta<T>;
196
197 fn split_delta(&mut self) -> Option<Self::Delta> {
198 self.pending_delta.take()
199 }
200
201 fn apply_delta(&mut self, delta: &Self::Delta) {
202 self.tombstones.extend(delta.removals.iter().cloned());
204
205 for (value, tags) in &delta.additions {
207 let entry = self.entries.entry(value.clone()).or_default();
208 for tag in tags {
209 if !self.tombstones.contains(tag) {
210 entry.insert(tag.clone());
211 }
212 }
213 }
214
215 self.entries.retain(|_, tags| !tags.is_empty());
217 }
218}