mdcs_merkle/
syncer.rs

1//! DAG synchronization with gap-repair logic.
2//!
3//! The DAGSyncer handles reconciliation between replicas by:
4//! 1. Discovering missing nodes via head comparison
5//! 2. Fetching missing nodes from peers recursively
6//! 3. Handling concurrent heads (multi-root scenarios)
7
8use crate::hash::Hash;
9use crate::node::MerkleNode;
10use crate::store::{DAGError, DAGStore};
11use std::collections::{HashSet, VecDeque};
12
13/// Errors that can occur during synchronization.
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum SyncError {
16    /// Failed to fetch a node from peers.
17    FetchFailed(Hash),
18
19    /// Node verification failed.
20    VerificationFailed(Hash),
21
22    /// DAG store error.
23    StoreError(DAGError),
24
25    /// No peers available for sync.
26    NoPeers,
27
28    /// Sync timeout.
29    Timeout,
30
31    /// Maximum depth exceeded during traversal.
32    MaxDepthExceeded,
33}
34
35impl std::fmt::Display for SyncError {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        match self {
38            SyncError::FetchFailed(h) => write!(f, "Failed to fetch node: {}", h.short()),
39            SyncError::VerificationFailed(h) => write!(f, "Verification failed: {}", h.short()),
40            SyncError::StoreError(e) => write!(f, "Store error: {}", e),
41            SyncError::NoPeers => write!(f, "No peers available"),
42            SyncError::Timeout => write!(f, "Sync timeout"),
43            SyncError::MaxDepthExceeded => write!(f, "Maximum traversal depth exceeded"),
44        }
45    }
46}
47
48impl std::error::Error for SyncError {}
49
50impl From<DAGError> for SyncError {
51    fn from(e: DAGError) -> Self {
52        SyncError::StoreError(e)
53    }
54}
55
56/// Request to fetch nodes from a peer.
57#[derive(Clone, Debug)]
58pub struct SyncRequest {
59    /// CIDs of nodes we need.
60    pub want: Vec<Hash>,
61
62    /// Our current heads (for the peer to determine what to send).
63    pub have: Vec<Hash>,
64
65    /// Maximum number of nodes to return.
66    pub limit: Option<usize>,
67}
68
69impl SyncRequest {
70    /// Create a new sync request for specific nodes.
71    pub fn want(cids: Vec<Hash>) -> Self {
72        SyncRequest {
73            want: cids,
74            have: Vec::new(),
75            limit: None,
76        }
77    }
78
79    /// Create a sync request with our current heads.
80    pub fn with_heads(mut self, heads: Vec<Hash>) -> Self {
81        self.have = heads;
82        self
83    }
84
85    /// Limit the response size.
86    pub fn with_limit(mut self, limit: usize) -> Self {
87        self.limit = Some(limit);
88        self
89    }
90}
91
92/// Response containing nodes from a peer.
93#[derive(Clone, Debug)]
94pub struct SyncResponse {
95    /// Nodes being sent.
96    pub nodes: Vec<MerkleNode>,
97
98    /// Additional nodes that could be sent (pagination).
99    pub more: Vec<Hash>,
100
101    /// Peer's current heads.
102    pub heads: Vec<Hash>,
103}
104
105impl SyncResponse {
106    /// Create an empty response.
107    pub fn empty() -> Self {
108        SyncResponse {
109            nodes: Vec::new(),
110            more: Vec::new(),
111            heads: Vec::new(),
112        }
113    }
114
115    /// Create a response with nodes.
116    pub fn with_nodes(nodes: Vec<MerkleNode>) -> Self {
117        SyncResponse {
118            nodes,
119            more: Vec::new(),
120            heads: Vec::new(),
121        }
122    }
123}
124
125/// Configuration for the DAG syncer.
126#[derive(Clone, Debug)]
127pub struct SyncConfig {
128    /// Maximum depth to traverse when fetching missing ancestors.
129    pub max_depth: usize,
130
131    /// Maximum number of nodes to fetch in a single request.
132    pub batch_size: usize,
133
134    /// Whether to verify nodes before storing.
135    pub verify_nodes: bool,
136}
137
138impl Default for SyncConfig {
139    fn default() -> Self {
140        SyncConfig {
141            max_depth: 1000,
142            batch_size: 100,
143            verify_nodes: true,
144        }
145    }
146}
147
148/// DAG synchronizer for gap-repair and reconciliation.
149///
150/// The syncer uses a pull-based approach:
151/// 1. Compare heads with a peer
152/// 2. Identify missing nodes
153/// 3. Fetch missing nodes recursively until reaching common ancestors
154pub struct DAGSyncer<S: DAGStore> {
155    /// The local DAG store.
156    store: S,
157
158    /// Configuration.
159    config: SyncConfig,
160}
161
162impl<S: DAGStore> DAGSyncer<S> {
163    /// Create a new syncer with a store.
164    pub fn new(store: S) -> Self {
165        DAGSyncer {
166            store,
167            config: SyncConfig::default(),
168        }
169    }
170
171    /// Create a syncer with custom configuration.
172    pub fn with_config(store: S, config: SyncConfig) -> Self {
173        DAGSyncer { store, config }
174    }
175
176    /// Get a reference to the store.
177    pub fn store(&self) -> &S {
178        &self.store
179    }
180
181    /// Get a mutable reference to the store.
182    pub fn store_mut(&mut self) -> &mut S {
183        &mut self.store
184    }
185
186    /// Get our current heads.
187    pub fn heads(&self) -> Vec<Hash> {
188        self.store.heads()
189    }
190
191    /// Determine which of the given CIDs we need (don't have locally).
192    pub fn need(&self, cids: &[Hash]) -> Vec<Hash> {
193        cids.iter()
194            .filter(|cid| !self.store.contains(cid))
195            .copied()
196            .collect()
197    }
198
199    /// Create a sync request for reconciliation with a peer.
200    pub fn create_request(&self, peer_heads: &[Hash]) -> SyncRequest {
201        let need = self.need(peer_heads);
202        SyncRequest::want(need)
203            .with_heads(self.heads())
204            .with_limit(self.config.batch_size)
205    }
206
207    /// Handle an incoming sync request from a peer.
208    pub fn handle_request(&self, request: &SyncRequest) -> SyncResponse {
209        let mut nodes = Vec::new();
210        let mut more = Vec::new();
211        let limit = request.limit.unwrap_or(self.config.batch_size);
212
213        // Collect requested nodes
214        for cid in &request.want {
215            if let Some(node) = self.store.get(cid) {
216                if nodes.len() < limit {
217                    nodes.push(node.clone());
218                } else {
219                    more.push(*cid);
220                }
221            }
222        }
223
224        // If peer provided their heads, we can proactively send nodes they're missing
225        if !request.have.is_empty() && nodes.len() < limit {
226            let peer_has: HashSet<_> = self.collect_known(&request.have);
227
228            // Find nodes we have that the peer doesn't
229            for cid in self.store.topological_order() {
230                if !peer_has.contains(&cid) {
231                    if let Some(node) = self.store.get(&cid) {
232                        if nodes.len() < limit {
233                            // Check if peer has the parents
234                            let has_parents = node
235                                .parents
236                                .iter()
237                                .all(|p| peer_has.contains(p) || nodes.iter().any(|n| n.cid == *p));
238
239                            if has_parents && !nodes.iter().any(|n| n.cid == cid) {
240                                nodes.push(node.clone());
241                            }
242                        } else {
243                            more.push(cid);
244                        }
245                    }
246                }
247            }
248        }
249
250        SyncResponse {
251            nodes,
252            more,
253            heads: self.heads(),
254        }
255    }
256
257    /// Apply a sync response, storing received nodes.
258    ///
259    /// Returns the CIDs of successfully stored nodes.
260    pub fn apply_response(&mut self, response: SyncResponse) -> Result<Vec<Hash>, SyncError> {
261        let mut stored = Vec::new();
262        let mut pending: VecDeque<MerkleNode> = response.nodes.into_iter().collect();
263        let mut attempts = 0;
264        let max_attempts = pending.len() * 2;
265
266        // Process nodes, retrying if parents aren't available yet
267        while let Some(node) = pending.pop_front() {
268            attempts += 1;
269            if attempts > max_attempts {
270                break;
271            }
272
273            if self.store.contains(&node.cid) {
274                stored.push(node.cid);
275                continue;
276            }
277
278            if self.config.verify_nodes && !node.verify() {
279                return Err(SyncError::VerificationFailed(node.cid));
280            }
281
282            // Try to store with parent check
283            match self.store.put(node.clone()) {
284                Ok(cid) => stored.push(cid),
285                Err(DAGError::MissingParents(_)) => {
286                    // Parents not yet available, retry later
287                    pending.push_back(node);
288                }
289                Err(e) => return Err(e.into()),
290            }
291        }
292
293        Ok(stored)
294    }
295
296    /// Apply nodes without strict parent checking.
297    ///
298    /// Used when nodes may arrive out of order.
299    pub fn apply_nodes_unchecked(
300        &mut self,
301        nodes: Vec<MerkleNode>,
302    ) -> Result<Vec<Hash>, SyncError> {
303        let mut stored = Vec::new();
304
305        for node in nodes {
306            if self.config.verify_nodes && !node.verify() {
307                return Err(SyncError::VerificationFailed(node.cid));
308            }
309
310            let cid = self.store.put_unchecked(node)?;
311            stored.push(cid);
312        }
313
314        Ok(stored)
315    }
316
317    /// Collect all CIDs reachable from the given heads (including the heads).
318    fn collect_known(&self, heads: &[Hash]) -> HashSet<Hash> {
319        let mut known = HashSet::new();
320        let mut queue: VecDeque<Hash> = heads.iter().copied().collect();
321
322        while let Some(cid) = queue.pop_front() {
323            if known.insert(cid) {
324                if let Some(node) = self.store.get(&cid) {
325                    queue.extend(node.parents.iter().copied());
326                }
327            }
328        }
329
330        known
331    }
332
333    /// Find the missing ancestors of the given CIDs.
334    ///
335    /// This performs gap detection by traversing backwards from the given CIDs
336    /// and identifying nodes that aren't in our store.
337    pub fn find_missing_ancestors(&self, cids: &[Hash]) -> Vec<Hash> {
338        let mut missing = Vec::new();
339        let mut visited = HashSet::new();
340        let mut queue: VecDeque<(Hash, usize)> = cids.iter().map(|cid| (*cid, 0)).collect();
341
342        while let Some((cid, depth)) = queue.pop_front() {
343            if depth > self.config.max_depth {
344                continue;
345            }
346
347            if !visited.insert(cid) {
348                continue;
349            }
350
351            if !self.store.contains(&cid) {
352                missing.push(cid);
353            } else if let Some(node) = self.store.get(&cid) {
354                // Traverse to parents
355                for parent in &node.parents {
356                    if !visited.contains(parent) {
357                        queue.push_back((*parent, depth + 1));
358                    }
359                }
360            }
361        }
362
363        missing
364    }
365
366    /// Check if we're synchronized with a peer (have all their nodes).
367    pub fn is_synced_with(&self, peer_heads: &[Hash]) -> bool {
368        // We're synced if we have all peer heads and their ancestors
369        for head in peer_heads {
370            if !self.store.contains(head) {
371                return false;
372            }
373        }
374
375        // Check we have no missing nodes
376        self.store.missing_nodes().is_empty()
377    }
378
379    /// Get statistics about sync status.
380    pub fn sync_status(&self) -> SyncStatus {
381        SyncStatus {
382            local_heads: self.heads().len(),
383            missing_nodes: self.store.missing_nodes().len(),
384            total_nodes: self.store.len(),
385        }
386    }
387}
388
389/// Status information about synchronization.
390#[derive(Clone, Debug)]
391pub struct SyncStatus {
392    pub local_heads: usize,
393    pub missing_nodes: usize,
394    pub total_nodes: usize,
395}
396
397/// Simulator for testing sync between multiple replicas.
398pub struct SyncSimulator {
399    /// Syncers for each replica.
400    syncers: Vec<DAGSyncer<crate::store::MemoryDAGStore>>,
401}
402
403impl SyncSimulator {
404    /// Create a simulator with n replicas, each with their own genesis.
405    pub fn new(n: usize) -> Self {
406        let syncers = (0..n)
407            .map(|i| {
408                let (store, _) =
409                    crate::store::MemoryDAGStore::with_genesis(format!("replica_{}", i));
410                DAGSyncer::new(store)
411            })
412            .collect();
413
414        SyncSimulator { syncers }
415    }
416
417    /// Create a simulator where all replicas share the same genesis.
418    pub fn with_shared_genesis(n: usize) -> Self {
419        let genesis = crate::node::NodeBuilder::genesis("shared");
420        let genesis_cid = genesis.cid;
421
422        let syncers = (0..n)
423            .map(|_| {
424                let mut store = crate::store::MemoryDAGStore::new();
425                store.put(genesis.clone()).unwrap();
426                DAGSyncer::new(store)
427            })
428            .collect();
429
430        let _ = genesis_cid; // Used in shared setup
431        SyncSimulator { syncers }
432    }
433
434    /// Get a reference to a syncer.
435    pub fn syncer(&self, idx: usize) -> &DAGSyncer<crate::store::MemoryDAGStore> {
436        &self.syncers[idx]
437    }
438
439    /// Get a mutable reference to a syncer.
440    pub fn syncer_mut(&mut self, idx: usize) -> &mut DAGSyncer<crate::store::MemoryDAGStore> {
441        &mut self.syncers[idx]
442    }
443
444    /// Perform one sync round between two replicas.
445    pub fn sync_pair(&mut self, from: usize, to: usize) {
446        let from_heads = self.syncers[from].heads();
447        let request = self.syncers[to].create_request(&from_heads);
448        let response = self.syncers[from].handle_request(&request);
449        let _ = self.syncers[to].apply_response(response);
450    }
451
452    /// Perform a full sync round (all pairs).
453    pub fn full_sync_round(&mut self) {
454        let n = self.syncers.len();
455        for i in 0..n {
456            for j in 0..n {
457                if i != j {
458                    self.sync_pair(i, j);
459                }
460            }
461        }
462    }
463
464    /// Check if all replicas have converged (same heads).
465    pub fn is_converged(&self) -> bool {
466        if self.syncers.is_empty() {
467            return true;
468        }
469
470        let reference_heads: HashSet<_> = self.syncers[0].heads().into_iter().collect();
471
472        self.syncers.iter().skip(1).all(|s| {
473            let heads: HashSet<_> = s.heads().into_iter().collect();
474            heads == reference_heads
475        })
476    }
477
478    /// Get the number of replicas.
479    pub fn replica_count(&self) -> usize {
480        self.syncers.len()
481    }
482}
483
484#[cfg(test)]
485mod tests {
486    use super::*;
487    use crate::node::{NodeBuilder, Payload};
488    use crate::store::MemoryDAGStore;
489
490    #[test]
491    fn test_basic_sync() {
492        let mut sim = SyncSimulator::with_shared_genesis(2);
493
494        // Add a node to replica 0
495        let heads = sim.syncer(0).heads();
496        let node = NodeBuilder::new()
497            .with_parent(heads[0])
498            .with_payload(Payload::delta(vec![1, 2, 3]))
499            .with_timestamp(1)
500            .with_creator("replica_0")
501            .build();
502        sim.syncer_mut(0).store_mut().put(node).unwrap();
503
504        // Before sync
505        assert!(!sim.is_converged());
506
507        // Sync
508        sim.sync_pair(0, 1);
509
510        // After sync
511        assert!(sim.is_converged());
512    }
513
514    #[test]
515    fn test_concurrent_updates_sync() {
516        let mut sim = SyncSimulator::with_shared_genesis(2);
517        let genesis = sim.syncer(0).heads()[0];
518
519        // Both replicas add concurrent nodes
520        let node_a = NodeBuilder::new()
521            .with_parent(genesis)
522            .with_payload(Payload::delta(b"from_0".to_vec()))
523            .with_timestamp(1)
524            .with_creator("replica_0")
525            .build();
526        sim.syncer_mut(0).store_mut().put(node_a).unwrap();
527
528        let node_b = NodeBuilder::new()
529            .with_parent(genesis)
530            .with_payload(Payload::delta(b"from_1".to_vec()))
531            .with_timestamp(1)
532            .with_creator("replica_1")
533            .build();
534        sim.syncer_mut(1).store_mut().put(node_b).unwrap();
535
536        // Each has 2 nodes, but different heads
537        assert_eq!(sim.syncer(0).store().len(), 2);
538        assert_eq!(sim.syncer(1).store().len(), 2);
539        assert!(!sim.is_converged());
540
541        // Sync both ways
542        sim.full_sync_round();
543
544        // Now both have 3 nodes and same heads
545        assert_eq!(sim.syncer(0).store().len(), 3);
546        assert_eq!(sim.syncer(1).store().len(), 3);
547
548        // Both should have 2 heads (the concurrent updates)
549        assert_eq!(sim.syncer(0).heads().len(), 2);
550        assert!(sim.is_converged());
551    }
552
553    #[test]
554    fn test_find_missing_ancestors() {
555        let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
556
557        // Build a chain: genesis -> a -> b -> c
558        let node_a = NodeBuilder::new()
559            .with_parent(genesis)
560            .with_payload(Payload::delta(vec![1]))
561            .with_timestamp(1)
562            .with_creator("r1")
563            .build();
564        let cid_a = store.put(node_a.clone()).unwrap();
565
566        let node_b = NodeBuilder::new()
567            .with_parent(cid_a)
568            .with_payload(Payload::delta(vec![2]))
569            .with_timestamp(2)
570            .with_creator("r1")
571            .build();
572        let cid_b = store.put(node_b.clone()).unwrap();
573
574        let node_c = NodeBuilder::new()
575            .with_parent(cid_b)
576            .with_payload(Payload::delta(vec![3]))
577            .with_timestamp(3)
578            .with_creator("r1")
579            .build();
580        let cid_c = node_c.cid;
581        store.put(node_c).unwrap();
582
583        // Create another store with only genesis
584        let (store2, _) = MemoryDAGStore::with_genesis("r1");
585        let syncer = DAGSyncer::new(store2);
586
587        // Find missing from perspective of store2
588        let missing = syncer.find_missing_ancestors(&[cid_c]);
589
590        // Should find cid_c as missing (we don't have it)
591        assert!(missing.contains(&cid_c));
592    }
593
594    #[test]
595    fn test_sync_request_response() {
596        let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
597
598        let node = NodeBuilder::new()
599            .with_parent(genesis)
600            .with_payload(Payload::delta(vec![1]))
601            .with_timestamp(1)
602            .with_creator("r1")
603            .build();
604        let cid = store.put(node).unwrap();
605
606        let syncer = DAGSyncer::new(store);
607
608        // Create a request asking for the node
609        let request = SyncRequest::want(vec![cid]);
610        let response = syncer.handle_request(&request);
611
612        assert_eq!(response.nodes.len(), 1);
613        assert_eq!(response.nodes[0].cid, cid);
614    }
615
616    #[test]
617    fn test_apply_response() {
618        let (_store1, genesis) = MemoryDAGStore::with_genesis("r1");
619
620        let node = NodeBuilder::new()
621            .with_parent(genesis)
622            .with_payload(Payload::delta(vec![1]))
623            .with_timestamp(1)
624            .with_creator("r1")
625            .build();
626        let cid = node.cid;
627
628        // Store2 doesn't have the node
629        let (store2, _) = MemoryDAGStore::with_genesis("r1");
630        let mut syncer2 = DAGSyncer::new(store2);
631
632        // Apply a response containing the node
633        let response = SyncResponse::with_nodes(vec![node]);
634        let stored = syncer2.apply_response(response).unwrap();
635
636        assert_eq!(stored.len(), 1);
637        assert_eq!(stored[0], cid);
638        assert!(syncer2.store().contains(&cid));
639    }
640
641    #[test]
642    fn test_is_synced_with() {
643        let mut sim = SyncSimulator::with_shared_genesis(2);
644        let genesis = sim.syncer(0).heads()[0];
645
646        // Initially synced (both have only genesis)
647        assert!(sim.syncer(0).is_synced_with(&sim.syncer(1).heads()));
648
649        // Add node to replica 0
650        let node = NodeBuilder::new()
651            .with_parent(genesis)
652            .with_payload(Payload::delta(vec![1]))
653            .with_timestamp(1)
654            .with_creator("r0")
655            .build();
656        sim.syncer_mut(0).store_mut().put(node).unwrap();
657
658        // Replica 1 is not synced with replica 0's heads
659        assert!(!sim.syncer(1).is_synced_with(&sim.syncer(0).heads()));
660
661        // After sync, they should be synced
662        sim.sync_pair(0, 1);
663        assert!(sim.syncer(1).is_synced_with(&sim.syncer(0).heads()));
664    }
665}