mdcs_merkle/
store.rs

1//! DAG storage trait and implementations.
2//!
3//! The DAGStore provides content-addressed storage for Merkle nodes,
4//! tracking heads (nodes without children) automatically.
5
6use crate::hash::Hash;
7use crate::node::MerkleNode;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::RwLock;
11
12/// Errors that can occur during DAG operations.
13#[derive(Clone, Debug, PartialEq, Eq)]
14pub enum DAGError {
15    /// Node not found in the store.
16    NotFound(Hash),
17
18    /// Node failed verification (CID doesn't match contents).
19    VerificationFailed(Hash),
20
21    /// Missing parent nodes (gap in the DAG).
22    MissingParents(Vec<Hash>),
23
24    /// Duplicate node (already exists).
25    Duplicate(Hash),
26}
27
28impl std::fmt::Display for DAGError {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        match self {
31            DAGError::NotFound(h) => write!(f, "Node not found: {}", h.short()),
32            DAGError::VerificationFailed(h) => write!(f, "Verification failed for: {}", h.short()),
33            DAGError::MissingParents(parents) => {
34                write!(
35                    f,
36                    "Missing parents: {:?}",
37                    parents.iter().map(|h| h.short()).collect::<Vec<_>>()
38                )
39            }
40            DAGError::Duplicate(h) => write!(f, "Duplicate node: {}", h.short()),
41        }
42    }
43}
44
45impl std::error::Error for DAGError {}
46
47/// Trait for content-addressed DAG storage.
48pub trait DAGStore {
49    /// Get a node by its CID.
50    fn get(&self, cid: &Hash) -> Option<&MerkleNode>;
51
52    /// Store a node, returning its CID.
53    ///
54    /// The node's CID is verified before storage.
55    /// Returns an error if verification fails or parents are missing.
56    fn put(&mut self, node: MerkleNode) -> Result<Hash, DAGError>;
57
58    /// Store a node without checking for missing parents.
59    ///
60    /// Used during sync when parents may arrive out of order.
61    fn put_unchecked(&mut self, node: MerkleNode) -> Result<Hash, DAGError>;
62
63    /// Get the current heads (nodes without children).
64    fn heads(&self) -> Vec<Hash>;
65
66    /// Check if a node exists in the store.
67    fn contains(&self, cid: &Hash) -> bool;
68
69    /// Get all ancestors of a node (transitive closure).
70    fn ancestors(&self, cid: &Hash) -> HashSet<Hash>;
71
72    /// Get immediate children of a node.
73    fn children(&self, cid: &Hash) -> Vec<Hash>;
74
75    /// Get all nodes in topological order (parents before children).
76    fn topological_order(&self) -> Vec<Hash>;
77
78    /// Get nodes that are missing (referenced but not present).
79    fn missing_nodes(&self) -> HashSet<Hash>;
80
81    /// Get the total number of nodes.
82    fn len(&self) -> usize;
83
84    /// Check if the store is empty.
85    fn is_empty(&self) -> bool {
86        self.len() == 0
87    }
88}
89
90/// In-memory implementation of DAGStore.
91#[derive(Debug, Default, Serialize, Deserialize)]
92pub struct MemoryDAGStore {
93    /// All nodes indexed by CID.
94    nodes: HashMap<Hash, MerkleNode>,
95
96    /// Current heads (nodes without children).
97    heads: HashSet<Hash>,
98
99    /// Reverse index: parent -> children.
100    children_index: HashMap<Hash, HashSet<Hash>>,
101
102    /// Referenced but missing nodes.
103    missing: HashSet<Hash>,
104
105    /// Cached topological order (invalidated on put/put_unchecked).
106    #[serde(skip)]
107    cached_topo_order: RwLock<Option<Vec<Hash>>>,
108}
109
110impl Clone for MemoryDAGStore {
111    fn clone(&self) -> Self {
112        let cached_topo_order = self
113            .cached_topo_order
114            .read()
115            .unwrap_or_else(|e| e.into_inner())
116            .clone();
117
118        Self {
119            nodes: self.nodes.clone(),
120            heads: self.heads.clone(),
121            children_index: self.children_index.clone(),
122            missing: self.missing.clone(),
123            cached_topo_order: RwLock::new(cached_topo_order),
124        }
125    }
126}
127
128impl MemoryDAGStore {
129    /// Create a new empty DAG store.
130    pub fn new() -> Self {
131        MemoryDAGStore {
132            nodes: HashMap::new(),
133            heads: HashSet::new(),
134            children_index: HashMap::new(),
135            missing: HashSet::new(),
136            cached_topo_order: RwLock::new(None),
137        }
138    }
139
140    /// Create a store with a genesis node.
141    pub fn with_genesis(creator: impl Into<String>) -> (Self, Hash) {
142        let mut store = Self::new();
143        let genesis = crate::node::NodeBuilder::genesis(creator);
144        let cid = store.put(genesis).expect("Genesis node should be valid");
145        (store, cid)
146    }
147
148    /// Update the heads set after adding a node.
149    fn update_heads(&mut self, node: &MerkleNode) {
150        // The new node becomes a head
151        self.heads.insert(node.cid);
152
153        // Its parents are no longer heads
154        for parent in &node.parents {
155            self.heads.remove(parent);
156        }
157    }
158
159    /// Update the children index after adding a node.
160    fn update_children_index(&mut self, node: &MerkleNode) {
161        for parent in &node.parents {
162            self.children_index
163                .entry(*parent)
164                .or_default()
165                .insert(node.cid);
166        }
167    }
168
169    /// Get statistics about the DAG.
170    pub fn stats(&self) -> DAGStats {
171        let max_depth = self.compute_max_depth();
172        let branching = self.compute_branching_stats();
173
174        DAGStats {
175            total_nodes: self.nodes.len(),
176            head_count: self.heads.len(),
177            missing_count: self.missing.len(),
178            max_depth,
179            avg_branching: branching,
180        }
181    }
182
183    /// Compute the maximum depth of the DAG.
184    fn compute_max_depth(&self) -> usize {
185        let mut depths: HashMap<Hash, usize> = HashMap::new();
186
187        for cid in self.topological_order() {
188            if let Some(node) = self.nodes.get(&cid) {
189                let parent_depth = node
190                    .parents
191                    .iter()
192                    .filter_map(|p| depths.get(p))
193                    .max()
194                    .copied()
195                    .unwrap_or(0);
196                depths.insert(cid, parent_depth + 1);
197            }
198        }
199
200        depths.values().max().copied().unwrap_or(0)
201    }
202
203    /// Compute average branching factor.
204    fn compute_branching_stats(&self) -> f64 {
205        if self.children_index.is_empty() {
206            return 0.0;
207        }
208
209        let total_children: usize = self.children_index.values().map(|c| c.len()).sum();
210
211        total_children as f64 / self.children_index.len() as f64
212    }
213}
214
215impl DAGStore for MemoryDAGStore {
216    fn get(&self, cid: &Hash) -> Option<&MerkleNode> {
217        self.nodes.get(cid)
218    }
219
220    fn put(&mut self, node: MerkleNode) -> Result<Hash, DAGError> {
221        // Verify the node's CID
222        if !node.verify() {
223            return Err(DAGError::VerificationFailed(node.cid));
224        }
225
226        // Check if already exists
227        if self.nodes.contains_key(&node.cid) {
228            return Ok(node.cid);
229        }
230
231        // Check for missing parents (unless this is a genesis node)
232        if !node.is_genesis() {
233            let missing: Vec<Hash> = node
234                .parents
235                .iter()
236                .filter(|p| !self.nodes.contains_key(p))
237                .copied()
238                .collect();
239
240            if !missing.is_empty() {
241                return Err(DAGError::MissingParents(missing));
242            }
243        }
244
245        let cid = node.cid;
246
247        // Update indices
248        self.update_heads(&node);
249        self.update_children_index(&node);
250
251        // Remove from missing if it was there
252        self.missing.remove(&cid);
253
254        // Store the node
255        self.nodes.insert(cid, node);
256
257        // Invalidate topological order cache
258        self.cached_topo_order
259            .write()
260            .unwrap_or_else(|e| e.into_inner())
261            .take();
262
263        Ok(cid)
264    }
265
266    fn put_unchecked(&mut self, node: MerkleNode) -> Result<Hash, DAGError> {
267        // Verify the node's CID
268        if !node.verify() {
269            return Err(DAGError::VerificationFailed(node.cid));
270        }
271
272        // Check if already exists
273        if self.nodes.contains_key(&node.cid) {
274            return Ok(node.cid);
275        }
276
277        let cid = node.cid;
278
279        // Track missing parents
280        for parent in &node.parents {
281            if !self.nodes.contains_key(parent) {
282                self.missing.insert(*parent);
283            }
284        }
285
286        // Update children index FIRST (before heads update)
287        self.update_children_index(&node);
288
289        // Update heads - but only add this node as a head if it has no children
290        // (This handles out-of-order insertion where children arrive before parents)
291        if !self.children_index.contains_key(&cid) {
292            self.heads.insert(cid);
293        }
294        // Its parents are no longer heads (if they were)
295        for parent in &node.parents {
296            self.heads.remove(parent);
297        }
298
299        // Remove from missing if it was there
300        self.missing.remove(&cid);
301
302        // Store the node
303        self.nodes.insert(cid, node);
304
305        // Invalidate topological order cache
306        self.cached_topo_order
307            .write()
308            .unwrap_or_else(|e| e.into_inner())
309            .take();
310
311        Ok(cid)
312    }
313
314    fn heads(&self) -> Vec<Hash> {
315        let mut heads: Vec<_> = self.heads.iter().copied().collect();
316        heads.sort();
317        heads
318    }
319
320    fn contains(&self, cid: &Hash) -> bool {
321        self.nodes.contains_key(cid)
322    }
323
324    fn ancestors(&self, cid: &Hash) -> HashSet<Hash> {
325        let mut result = HashSet::new();
326        let mut queue = VecDeque::new();
327
328        if let Some(node) = self.nodes.get(cid) {
329            queue.extend(node.parents.iter().copied());
330        }
331
332        while let Some(current) = queue.pop_front() {
333            if result.insert(current) {
334                if let Some(node) = self.nodes.get(&current) {
335                    queue.extend(node.parents.iter().copied());
336                }
337            }
338        }
339
340        result
341    }
342
343    fn children(&self, cid: &Hash) -> Vec<Hash> {
344        self.children_index
345            .get(cid)
346            .map(|c| c.iter().copied().collect())
347            .unwrap_or_default()
348    }
349
350    fn topological_order(&self) -> Vec<Hash> {
351        // Check cache first
352        {
353            let cached = self
354                .cached_topo_order
355                .read()
356                .unwrap_or_else(|e| e.into_inner());
357            if let Some(order) = cached.as_ref() {
358                return order.clone();
359            }
360        }
361
362        // Kahn's algorithm for topological sort
363        let mut in_degree: HashMap<Hash, usize> = HashMap::new();
364        let mut result = Vec::new();
365        let mut queue = VecDeque::new();
366
367        // Calculate in-degrees (number of parents in the store)
368        for (cid, node) in &self.nodes {
369            let degree = node
370                .parents
371                .iter()
372                .filter(|p| self.nodes.contains_key(p))
373                .count();
374            in_degree.insert(*cid, degree);
375
376            if degree == 0 {
377                queue.push_back(*cid);
378            }
379        }
380
381        // Process nodes with no dependencies
382        while let Some(cid) = queue.pop_front() {
383            result.push(cid);
384
385            if let Some(children) = self.children_index.get(&cid) {
386                for child in children {
387                    if let Some(degree) = in_degree.get_mut(child) {
388                        *degree = degree.saturating_sub(1);
389                        if *degree == 0 {
390                            queue.push_back(*child);
391                        }
392                    }
393                }
394            }
395        }
396
397        // Cache the result
398        self.cached_topo_order
399            .write()
400            .unwrap_or_else(|e| e.into_inner())
401            .replace(result.clone());
402        result
403    }
404
405    fn missing_nodes(&self) -> HashSet<Hash> {
406        self.missing.clone()
407    }
408
409    fn len(&self) -> usize {
410        self.nodes.len()
411    }
412}
413
414/// Statistics about a DAG.
415#[derive(Clone, Debug)]
416pub struct DAGStats {
417    pub total_nodes: usize,
418    pub head_count: usize,
419    pub missing_count: usize,
420    pub max_depth: usize,
421    pub avg_branching: f64,
422}
423
424#[cfg(test)]
425mod tests {
426    use super::*;
427    use crate::node::{NodeBuilder, Payload};
428
429    #[test]
430    fn test_memory_store_is_send_sync() {
431        fn assert_send_sync<T: Send + Sync>() {}
432        assert_send_sync::<MemoryDAGStore>();
433    }
434
435    #[test]
436    fn test_genesis_store() {
437        let (store, genesis_cid) = MemoryDAGStore::with_genesis("replica_1");
438
439        assert_eq!(store.len(), 1);
440        assert!(store.contains(&genesis_cid));
441        assert_eq!(store.heads(), vec![genesis_cid]);
442    }
443
444    #[test]
445    fn test_linear_chain() {
446        let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
447
448        let node1 = NodeBuilder::new()
449            .with_parent(genesis)
450            .with_payload(Payload::delta(vec![1]))
451            .with_timestamp(1)
452            .with_creator("r1")
453            .build();
454        let cid1 = store.put(node1).unwrap();
455
456        let node2 = NodeBuilder::new()
457            .with_parent(cid1)
458            .with_payload(Payload::delta(vec![2]))
459            .with_timestamp(2)
460            .with_creator("r1")
461            .build();
462        let cid2 = store.put(node2).unwrap();
463
464        assert_eq!(store.len(), 3);
465        assert_eq!(store.heads(), vec![cid2]);
466        assert_eq!(store.ancestors(&cid2), HashSet::from([genesis, cid1]));
467    }
468
469    #[test]
470    fn test_concurrent_branches() {
471        let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
472
473        // Two concurrent branches
474        let branch_a = NodeBuilder::new()
475            .with_parent(genesis)
476            .with_payload(Payload::delta(b"a".to_vec()))
477            .with_timestamp(1)
478            .with_creator("r1")
479            .build();
480        let cid_a = store.put(branch_a).unwrap();
481
482        let branch_b = NodeBuilder::new()
483            .with_parent(genesis)
484            .with_payload(Payload::delta(b"b".to_vec()))
485            .with_timestamp(1)
486            .with_creator("r2")
487            .build();
488        let cid_b = store.put(branch_b).unwrap();
489
490        // Both should be heads
491        let heads = store.heads();
492        assert_eq!(heads.len(), 2);
493        assert!(heads.contains(&cid_a));
494        assert!(heads.contains(&cid_b));
495    }
496
497    #[test]
498    fn test_merge_node() {
499        let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
500
501        let branch_a = NodeBuilder::new()
502            .with_parent(genesis)
503            .with_payload(Payload::delta(b"a".to_vec()))
504            .with_timestamp(1)
505            .with_creator("r1")
506            .build();
507        let cid_a = store.put(branch_a).unwrap();
508
509        let branch_b = NodeBuilder::new()
510            .with_parent(genesis)
511            .with_payload(Payload::delta(b"b".to_vec()))
512            .with_timestamp(1)
513            .with_creator("r2")
514            .build();
515        let cid_b = store.put(branch_b).unwrap();
516
517        // Merge node
518        let merge = NodeBuilder::new()
519            .with_parents(vec![cid_a, cid_b])
520            .with_payload(Payload::delta(b"merge".to_vec()))
521            .with_timestamp(2)
522            .with_creator("r1")
523            .build();
524        let merge_cid = store.put(merge).unwrap();
525
526        // Only merge should be a head now
527        assert_eq!(store.heads(), vec![merge_cid]);
528
529        // Ancestors should include both branches and genesis
530        let ancestors = store.ancestors(&merge_cid);
531        assert!(ancestors.contains(&cid_a));
532        assert!(ancestors.contains(&cid_b));
533        assert!(ancestors.contains(&genesis));
534    }
535
536    #[test]
537    fn test_missing_parents_error() {
538        let mut store = MemoryDAGStore::new();
539
540        let fake_parent = crate::hash::Hasher::hash(b"fake");
541
542        let node = NodeBuilder::new()
543            .with_parent(fake_parent)
544            .with_payload(Payload::delta(vec![1]))
545            .with_timestamp(1)
546            .with_creator("r1")
547            .build();
548
549        let result = store.put(node);
550        assert!(matches!(result, Err(DAGError::MissingParents(_))));
551    }
552
553    #[test]
554    fn test_put_unchecked() {
555        let mut store = MemoryDAGStore::new();
556
557        let fake_parent = crate::hash::Hasher::hash(b"fake");
558
559        let node = NodeBuilder::new()
560            .with_parent(fake_parent)
561            .with_payload(Payload::delta(vec![1]))
562            .with_timestamp(1)
563            .with_creator("r1")
564            .build();
565
566        // Should succeed with put_unchecked
567        let cid = store.put_unchecked(node).unwrap();
568
569        // Should track the missing parent
570        assert!(store.missing_nodes().contains(&fake_parent));
571        assert!(store.contains(&cid));
572    }
573
574    #[test]
575    fn test_topological_order() {
576        let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
577
578        let node1 = NodeBuilder::new()
579            .with_parent(genesis)
580            .with_payload(Payload::delta(vec![1]))
581            .with_timestamp(1)
582            .with_creator("r1")
583            .build();
584        let cid1 = store.put(node1).unwrap();
585
586        let node2 = NodeBuilder::new()
587            .with_parent(cid1)
588            .with_payload(Payload::delta(vec![2]))
589            .with_timestamp(2)
590            .with_creator("r1")
591            .build();
592        let cid2 = store.put(node2).unwrap();
593
594        let order = store.topological_order();
595
596        // Genesis should come before node1, node1 before node2
597        let genesis_pos = order.iter().position(|&c| c == genesis).unwrap();
598        let cid1_pos = order.iter().position(|&c| c == cid1).unwrap();
599        let cid2_pos = order.iter().position(|&c| c == cid2).unwrap();
600
601        assert!(genesis_pos < cid1_pos);
602        assert!(cid1_pos < cid2_pos);
603    }
604
605    #[test]
606    fn test_children_index() {
607        let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
608
609        let child1 = NodeBuilder::new()
610            .with_parent(genesis)
611            .with_payload(Payload::delta(vec![1]))
612            .with_timestamp(1)
613            .with_creator("r1")
614            .build();
615        let cid1 = store.put(child1).unwrap();
616
617        let child2 = NodeBuilder::new()
618            .with_parent(genesis)
619            .with_payload(Payload::delta(vec![2]))
620            .with_timestamp(1)
621            .with_creator("r2")
622            .build();
623        let cid2 = store.put(child2).unwrap();
624
625        let children = store.children(&genesis);
626        assert_eq!(children.len(), 2);
627        assert!(children.contains(&cid1));
628        assert!(children.contains(&cid2));
629    }
630
631    #[test]
632    fn test_dag_stats() {
633        let (mut store, _genesis) = MemoryDAGStore::with_genesis("r1");
634
635        for i in 0..5 {
636            let last_head = store.heads()[0];
637            let node = NodeBuilder::new()
638                .with_parent(last_head)
639                .with_payload(Payload::delta(vec![i]))
640                .with_timestamp(i as u64 + 1)
641                .with_creator("r1")
642                .build();
643            store.put(node).unwrap();
644        }
645
646        let stats = store.stats();
647        assert_eq!(stats.total_nodes, 6);
648        assert_eq!(stats.head_count, 1);
649        assert_eq!(stats.max_depth, 6);
650    }
651}