mdcs_compaction/
version_vector.rs

1//! Version vector for compact causal context representation.
2//!
3//! A version vector summarizes the causal context by tracking the highest
4//! sequence number seen from each replica. This is more compact than storing
5//! all individual dots.
6
7use serde::{Deserialize, Serialize};
8use std::collections::BTreeMap;
9
10/// A single entry in a version vector.
11#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
12pub struct VectorEntry {
13    /// The replica ID.
14    pub replica_id: String,
15    /// The highest sequence number seen from this replica.
16    pub sequence: u64,
17}
18
19/// A version vector tracking the frontier of seen updates per replica.
20///
21/// Version vectors provide a compact summary of causal history when
22/// updates from each replica are contiguous (no gaps).
23#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
24pub struct VersionVector {
25    /// Map from replica ID to highest seen sequence number.
26    entries: BTreeMap<String, u64>,
27}
28
29impl VersionVector {
30    /// Create an empty version vector.
31    pub fn new() -> Self {
32        VersionVector {
33            entries: BTreeMap::new(),
34        }
35    }
36
37    /// Create a version vector from entries.
38    pub fn from_entries(entries: impl IntoIterator<Item = (String, u64)>) -> Self {
39        VersionVector {
40            entries: entries.into_iter().collect(),
41        }
42    }
43
44    /// Get the sequence number for a replica.
45    pub fn get(&self, replica_id: &str) -> u64 {
46        self.entries.get(replica_id).copied().unwrap_or(0)
47    }
48
49    /// Set the sequence number for a replica.
50    pub fn set(&mut self, replica_id: impl Into<String>, sequence: u64) {
51        let replica_id = replica_id.into();
52        if sequence > 0 {
53            self.entries.insert(replica_id, sequence);
54        }
55    }
56
57    /// Increment the sequence number for a replica, returning the new value.
58    pub fn increment(&mut self, replica_id: impl Into<String>) -> u64 {
59        let replica_id = replica_id.into();
60        let entry = self.entries.entry(replica_id).or_insert(0);
61        *entry += 1;
62        *entry
63    }
64
65    /// Check if this vector dominates another (is causally after or concurrent).
66    /// Returns true if for all replicas, self\[r\] >= other\[r\].
67    pub fn dominates(&self, other: &VersionVector) -> bool {
68        // Check all entries in other
69        for (replica_id, &seq) in &other.entries {
70            if self.get(replica_id) < seq {
71                return false;
72            }
73        }
74        true
75    }
76
77    /// Check if this vector is strictly greater than another.
78    /// Returns true if dominates(other) AND self != other.
79    pub fn strictly_dominates(&self, other: &VersionVector) -> bool {
80        self.dominates(other) && self != other
81    }
82
83    /// Check if two vectors are concurrent (neither dominates the other).
84    pub fn is_concurrent_with(&self, other: &VersionVector) -> bool {
85        !self.dominates(other) && !other.dominates(self)
86    }
87
88    /// Merge with another version vector (component-wise max).
89    pub fn merge(&mut self, other: &VersionVector) {
90        for (replica_id, &seq) in &other.entries {
91            let current = self.entries.entry(replica_id.clone()).or_insert(0);
92            *current = (*current).max(seq);
93        }
94    }
95
96    /// Create a merged version vector without modifying self.
97    pub fn merged_with(&self, other: &VersionVector) -> VersionVector {
98        let mut result = self.clone();
99        result.merge(other);
100        result
101    }
102
103    /// Get the minimum across all replicas in both vectors.
104    /// This represents the "stable" point that all replicas have seen.
105    pub fn min_with(&self, other: &VersionVector) -> VersionVector {
106        let mut result = VersionVector::new();
107
108        // Get all replica IDs from both vectors
109        let all_replicas: std::collections::BTreeSet<_> = self
110            .entries
111            .keys()
112            .chain(other.entries.keys())
113            .cloned()
114            .collect();
115
116        for replica_id in all_replicas {
117            let self_seq = self.get(&replica_id);
118            let other_seq = other.get(&replica_id);
119            let min_seq = self_seq.min(other_seq);
120            if min_seq > 0 {
121                result.set(replica_id, min_seq);
122            }
123        }
124
125        result
126    }
127
128    /// Iterate over all entries.
129    pub fn iter(&self) -> impl Iterator<Item = (&String, &u64)> {
130        self.entries.iter()
131    }
132
133    /// Get the number of replicas tracked.
134    pub fn len(&self) -> usize {
135        self.entries.len()
136    }
137
138    /// Check if the version vector is empty.
139    pub fn is_empty(&self) -> bool {
140        self.entries.is_empty()
141    }
142
143    /// Get the sum of all sequence numbers (total operations seen).
144    pub fn total_operations(&self) -> u64 {
145        self.entries.values().sum()
146    }
147
148    /// Convert to a list of entries.
149    pub fn to_entries(&self) -> Vec<VectorEntry> {
150        self.entries
151            .iter()
152            .map(|(replica_id, &sequence)| VectorEntry {
153                replica_id: replica_id.clone(),
154                sequence,
155            })
156            .collect()
157    }
158
159    /// Create from a list of entries.
160    pub fn from_entry_list(entries: Vec<VectorEntry>) -> Self {
161        VersionVector {
162            entries: entries
163                .into_iter()
164                .map(|e| (e.replica_id, e.sequence))
165                .collect(),
166        }
167    }
168
169    /// Check if a specific (replica_id, sequence) pair is included.
170    pub fn contains(&self, replica_id: &str, sequence: u64) -> bool {
171        self.get(replica_id) >= sequence
172    }
173
174    /// Get the difference: operations in self but not in other.
175    /// Returns (replica_id, start_seq, end_seq) ranges.
176    pub fn diff(&self, other: &VersionVector) -> Vec<(String, u64, u64)> {
177        let mut diffs = Vec::new();
178
179        for (replica_id, &self_seq) in &self.entries {
180            let other_seq = other.get(replica_id);
181            if self_seq > other_seq {
182                diffs.push((replica_id.clone(), other_seq + 1, self_seq));
183            }
184        }
185
186        diffs
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn test_version_vector_basic() {
196        let mut vv = VersionVector::new();
197        assert_eq!(vv.get("r1"), 0);
198
199        vv.set("r1", 5);
200        assert_eq!(vv.get("r1"), 5);
201
202        let seq = vv.increment("r1");
203        assert_eq!(seq, 6);
204        assert_eq!(vv.get("r1"), 6);
205    }
206
207    #[test]
208    fn test_version_vector_dominates() {
209        let vv1 = VersionVector::from_entries([("r1".to_string(), 5), ("r2".to_string(), 3)]);
210        let vv2 = VersionVector::from_entries([("r1".to_string(), 3), ("r2".to_string(), 3)]);
211        let vv3 = VersionVector::from_entries([("r1".to_string(), 5), ("r2".to_string(), 5)]);
212
213        assert!(vv1.dominates(&vv2));
214        assert!(!vv2.dominates(&vv1));
215        assert!(vv3.dominates(&vv1));
216        assert!(!vv1.dominates(&vv3));
217    }
218
219    #[test]
220    fn test_version_vector_concurrent() {
221        let vv1 = VersionVector::from_entries([("r1".to_string(), 5), ("r2".to_string(), 3)]);
222        let vv2 = VersionVector::from_entries([("r1".to_string(), 3), ("r2".to_string(), 5)]);
223
224        assert!(vv1.is_concurrent_with(&vv2));
225        assert!(vv2.is_concurrent_with(&vv1));
226    }
227
228    #[test]
229    fn test_version_vector_merge() {
230        let vv1 = VersionVector::from_entries([("r1".to_string(), 5), ("r2".to_string(), 3)]);
231        let vv2 = VersionVector::from_entries([("r1".to_string(), 3), ("r2".to_string(), 7)]);
232
233        let merged = vv1.merged_with(&vv2);
234        assert_eq!(merged.get("r1"), 5);
235        assert_eq!(merged.get("r2"), 7);
236    }
237
238    #[test]
239    fn test_version_vector_min() {
240        let vv1 = VersionVector::from_entries([("r1".to_string(), 5), ("r2".to_string(), 3)]);
241        let vv2 = VersionVector::from_entries([("r1".to_string(), 3), ("r2".to_string(), 7)]);
242
243        let min = vv1.min_with(&vv2);
244        assert_eq!(min.get("r1"), 3);
245        assert_eq!(min.get("r2"), 3);
246    }
247
248    #[test]
249    fn test_version_vector_diff() {
250        let vv1 = VersionVector::from_entries([("r1".to_string(), 10), ("r2".to_string(), 5)]);
251        let vv2 = VersionVector::from_entries([("r1".to_string(), 7), ("r2".to_string(), 5)]);
252
253        let diff = vv1.diff(&vv2);
254        assert_eq!(diff.len(), 1);
255        assert_eq!(diff[0], ("r1".to_string(), 8, 10));
256    }
257
258    #[test]
259    fn test_version_vector_serialization() {
260        let vv = VersionVector::from_entries([("r1".to_string(), 5), ("r2".to_string(), 10)]);
261
262        let json = serde_json::to_string(&vv).unwrap();
263        let deserialized: VersionVector = serde_json::from_str(&json).unwrap();
264        assert_eq!(vv, deserialized);
265    }
266
267    #[test]
268    fn test_version_vector_contains() {
269        let vv = VersionVector::from_entries([("r1".to_string(), 5)]);
270
271        assert!(vv.contains("r1", 1));
272        assert!(vv.contains("r1", 5));
273        assert!(!vv.contains("r1", 6));
274        assert!(!vv.contains("r2", 1));
275    }
276}