1use crate::hash::Hash;
9use crate::node::MerkleNode;
10use crate::store::{DAGError, DAGStore};
11use std::collections::{HashSet, VecDeque};
12
13#[derive(Clone, Debug, PartialEq, Eq)]
15pub enum SyncError {
16 FetchFailed(Hash),
18
19 VerificationFailed(Hash),
21
22 StoreError(DAGError),
24
25 NoPeers,
27
28 Timeout,
30
31 MaxDepthExceeded,
33}
34
35impl std::fmt::Display for SyncError {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 match self {
38 SyncError::FetchFailed(h) => write!(f, "Failed to fetch node: {}", h.short()),
39 SyncError::VerificationFailed(h) => write!(f, "Verification failed: {}", h.short()),
40 SyncError::StoreError(e) => write!(f, "Store error: {}", e),
41 SyncError::NoPeers => write!(f, "No peers available"),
42 SyncError::Timeout => write!(f, "Sync timeout"),
43 SyncError::MaxDepthExceeded => write!(f, "Maximum traversal depth exceeded"),
44 }
45 }
46}
47
48impl std::error::Error for SyncError {}
49
50impl From<DAGError> for SyncError {
51 fn from(e: DAGError) -> Self {
52 SyncError::StoreError(e)
53 }
54}
55
56#[derive(Clone, Debug)]
58pub struct SyncRequest {
59 pub want: Vec<Hash>,
61
62 pub have: Vec<Hash>,
64
65 pub limit: Option<usize>,
67}
68
69impl SyncRequest {
70 pub fn want(cids: Vec<Hash>) -> Self {
72 SyncRequest {
73 want: cids,
74 have: Vec::new(),
75 limit: None,
76 }
77 }
78
79 pub fn with_heads(mut self, heads: Vec<Hash>) -> Self {
81 self.have = heads;
82 self
83 }
84
85 pub fn with_limit(mut self, limit: usize) -> Self {
87 self.limit = Some(limit);
88 self
89 }
90}
91
92#[derive(Clone, Debug)]
94pub struct SyncResponse {
95 pub nodes: Vec<MerkleNode>,
97
98 pub more: Vec<Hash>,
100
101 pub heads: Vec<Hash>,
103}
104
105impl SyncResponse {
106 pub fn empty() -> Self {
108 SyncResponse {
109 nodes: Vec::new(),
110 more: Vec::new(),
111 heads: Vec::new(),
112 }
113 }
114
115 pub fn with_nodes(nodes: Vec<MerkleNode>) -> Self {
117 SyncResponse {
118 nodes,
119 more: Vec::new(),
120 heads: Vec::new(),
121 }
122 }
123}
124
125#[derive(Clone, Debug)]
127pub struct SyncConfig {
128 pub max_depth: usize,
130
131 pub batch_size: usize,
133
134 pub verify_nodes: bool,
136}
137
138impl Default for SyncConfig {
139 fn default() -> Self {
140 SyncConfig {
141 max_depth: 1000,
142 batch_size: 100,
143 verify_nodes: true,
144 }
145 }
146}
147
148pub struct DAGSyncer<S: DAGStore> {
155 store: S,
157
158 config: SyncConfig,
160}
161
162impl<S: DAGStore> DAGSyncer<S> {
163 pub fn new(store: S) -> Self {
165 DAGSyncer {
166 store,
167 config: SyncConfig::default(),
168 }
169 }
170
171 pub fn with_config(store: S, config: SyncConfig) -> Self {
173 DAGSyncer { store, config }
174 }
175
176 pub fn store(&self) -> &S {
178 &self.store
179 }
180
181 pub fn store_mut(&mut self) -> &mut S {
183 &mut self.store
184 }
185
186 pub fn heads(&self) -> Vec<Hash> {
188 self.store.heads()
189 }
190
191 pub fn need(&self, cids: &[Hash]) -> Vec<Hash> {
193 cids.iter()
194 .filter(|cid| !self.store.contains(cid))
195 .copied()
196 .collect()
197 }
198
199 pub fn create_request(&self, peer_heads: &[Hash]) -> SyncRequest {
201 let need = self.need(peer_heads);
202 SyncRequest::want(need)
203 .with_heads(self.heads())
204 .with_limit(self.config.batch_size)
205 }
206
207 pub fn handle_request(&self, request: &SyncRequest) -> SyncResponse {
209 let mut nodes = Vec::new();
210 let mut more = Vec::new();
211 let limit = request.limit.unwrap_or(self.config.batch_size);
212
213 for cid in &request.want {
215 if let Some(node) = self.store.get(cid) {
216 if nodes.len() < limit {
217 nodes.push(node.clone());
218 } else {
219 more.push(*cid);
220 }
221 }
222 }
223
224 if !request.have.is_empty() && nodes.len() < limit {
226 let peer_has: HashSet<_> = self.collect_known(&request.have);
227
228 for cid in self.store.topological_order() {
230 if !peer_has.contains(&cid) {
231 if let Some(node) = self.store.get(&cid) {
232 if nodes.len() < limit {
233 let has_parents = node
235 .parents
236 .iter()
237 .all(|p| peer_has.contains(p) || nodes.iter().any(|n| n.cid == *p));
238
239 if has_parents && !nodes.iter().any(|n| n.cid == cid) {
240 nodes.push(node.clone());
241 }
242 } else {
243 more.push(cid);
244 }
245 }
246 }
247 }
248 }
249
250 SyncResponse {
251 nodes,
252 more,
253 heads: self.heads(),
254 }
255 }
256
257 pub fn apply_response(&mut self, response: SyncResponse) -> Result<Vec<Hash>, SyncError> {
261 let mut stored = Vec::new();
262 let mut pending: VecDeque<MerkleNode> = response.nodes.into_iter().collect();
263 let mut attempts = 0;
264 let max_attempts = pending.len() * 2;
265
266 while let Some(node) = pending.pop_front() {
268 attempts += 1;
269 if attempts > max_attempts {
270 break;
271 }
272
273 if self.store.contains(&node.cid) {
274 stored.push(node.cid);
275 continue;
276 }
277
278 if self.config.verify_nodes && !node.verify() {
279 return Err(SyncError::VerificationFailed(node.cid));
280 }
281
282 match self.store.put(node.clone()) {
284 Ok(cid) => stored.push(cid),
285 Err(DAGError::MissingParents(_)) => {
286 pending.push_back(node);
288 }
289 Err(e) => return Err(e.into()),
290 }
291 }
292
293 Ok(stored)
294 }
295
296 pub fn apply_nodes_unchecked(
300 &mut self,
301 nodes: Vec<MerkleNode>,
302 ) -> Result<Vec<Hash>, SyncError> {
303 let mut stored = Vec::new();
304
305 for node in nodes {
306 if self.config.verify_nodes && !node.verify() {
307 return Err(SyncError::VerificationFailed(node.cid));
308 }
309
310 let cid = self.store.put_unchecked(node)?;
311 stored.push(cid);
312 }
313
314 Ok(stored)
315 }
316
317 fn collect_known(&self, heads: &[Hash]) -> HashSet<Hash> {
319 let mut known = HashSet::new();
320 let mut queue: VecDeque<Hash> = heads.iter().copied().collect();
321
322 while let Some(cid) = queue.pop_front() {
323 if known.insert(cid) {
324 if let Some(node) = self.store.get(&cid) {
325 queue.extend(node.parents.iter().copied());
326 }
327 }
328 }
329
330 known
331 }
332
333 pub fn find_missing_ancestors(&self, cids: &[Hash]) -> Vec<Hash> {
338 let mut missing = Vec::new();
339 let mut visited = HashSet::new();
340 let mut queue: VecDeque<(Hash, usize)> = cids.iter().map(|cid| (*cid, 0)).collect();
341
342 while let Some((cid, depth)) = queue.pop_front() {
343 if depth > self.config.max_depth {
344 continue;
345 }
346
347 if !visited.insert(cid) {
348 continue;
349 }
350
351 if !self.store.contains(&cid) {
352 missing.push(cid);
353 } else if let Some(node) = self.store.get(&cid) {
354 for parent in &node.parents {
356 if !visited.contains(parent) {
357 queue.push_back((*parent, depth + 1));
358 }
359 }
360 }
361 }
362
363 missing
364 }
365
366 pub fn is_synced_with(&self, peer_heads: &[Hash]) -> bool {
368 for head in peer_heads {
370 if !self.store.contains(head) {
371 return false;
372 }
373 }
374
375 self.store.missing_nodes().is_empty()
377 }
378
379 pub fn sync_status(&self) -> SyncStatus {
381 SyncStatus {
382 local_heads: self.heads().len(),
383 missing_nodes: self.store.missing_nodes().len(),
384 total_nodes: self.store.len(),
385 }
386 }
387}
388
389#[derive(Clone, Debug)]
391pub struct SyncStatus {
392 pub local_heads: usize,
393 pub missing_nodes: usize,
394 pub total_nodes: usize,
395}
396
397pub struct SyncSimulator {
399 syncers: Vec<DAGSyncer<crate::store::MemoryDAGStore>>,
401}
402
403impl SyncSimulator {
404 pub fn new(n: usize) -> Self {
406 let syncers = (0..n)
407 .map(|i| {
408 let (store, _) =
409 crate::store::MemoryDAGStore::with_genesis(format!("replica_{}", i));
410 DAGSyncer::new(store)
411 })
412 .collect();
413
414 SyncSimulator { syncers }
415 }
416
417 pub fn with_shared_genesis(n: usize) -> Self {
419 let genesis = crate::node::NodeBuilder::genesis("shared");
420 let genesis_cid = genesis.cid;
421
422 let syncers = (0..n)
423 .map(|_| {
424 let mut store = crate::store::MemoryDAGStore::new();
425 store.put(genesis.clone()).unwrap();
426 DAGSyncer::new(store)
427 })
428 .collect();
429
430 let _ = genesis_cid; SyncSimulator { syncers }
432 }
433
434 pub fn syncer(&self, idx: usize) -> &DAGSyncer<crate::store::MemoryDAGStore> {
436 &self.syncers[idx]
437 }
438
439 pub fn syncer_mut(&mut self, idx: usize) -> &mut DAGSyncer<crate::store::MemoryDAGStore> {
441 &mut self.syncers[idx]
442 }
443
444 pub fn sync_pair(&mut self, from: usize, to: usize) {
446 let from_heads = self.syncers[from].heads();
447 let request = self.syncers[to].create_request(&from_heads);
448 let response = self.syncers[from].handle_request(&request);
449 let _ = self.syncers[to].apply_response(response);
450 }
451
452 pub fn full_sync_round(&mut self) {
454 let n = self.syncers.len();
455 for i in 0..n {
456 for j in 0..n {
457 if i != j {
458 self.sync_pair(i, j);
459 }
460 }
461 }
462 }
463
464 pub fn is_converged(&self) -> bool {
466 if self.syncers.is_empty() {
467 return true;
468 }
469
470 let reference_heads: HashSet<_> = self.syncers[0].heads().into_iter().collect();
471
472 self.syncers.iter().skip(1).all(|s| {
473 let heads: HashSet<_> = s.heads().into_iter().collect();
474 heads == reference_heads
475 })
476 }
477
478 pub fn replica_count(&self) -> usize {
480 self.syncers.len()
481 }
482}
483
484#[cfg(test)]
485mod tests {
486 use super::*;
487 use crate::node::{NodeBuilder, Payload};
488 use crate::store::MemoryDAGStore;
489
490 #[test]
491 fn test_basic_sync() {
492 let mut sim = SyncSimulator::with_shared_genesis(2);
493
494 let heads = sim.syncer(0).heads();
496 let node = NodeBuilder::new()
497 .with_parent(heads[0])
498 .with_payload(Payload::delta(vec![1, 2, 3]))
499 .with_timestamp(1)
500 .with_creator("replica_0")
501 .build();
502 sim.syncer_mut(0).store_mut().put(node).unwrap();
503
504 assert!(!sim.is_converged());
506
507 sim.sync_pair(0, 1);
509
510 assert!(sim.is_converged());
512 }
513
514 #[test]
515 fn test_concurrent_updates_sync() {
516 let mut sim = SyncSimulator::with_shared_genesis(2);
517 let genesis = sim.syncer(0).heads()[0];
518
519 let node_a = NodeBuilder::new()
521 .with_parent(genesis)
522 .with_payload(Payload::delta(b"from_0".to_vec()))
523 .with_timestamp(1)
524 .with_creator("replica_0")
525 .build();
526 sim.syncer_mut(0).store_mut().put(node_a).unwrap();
527
528 let node_b = NodeBuilder::new()
529 .with_parent(genesis)
530 .with_payload(Payload::delta(b"from_1".to_vec()))
531 .with_timestamp(1)
532 .with_creator("replica_1")
533 .build();
534 sim.syncer_mut(1).store_mut().put(node_b).unwrap();
535
536 assert_eq!(sim.syncer(0).store().len(), 2);
538 assert_eq!(sim.syncer(1).store().len(), 2);
539 assert!(!sim.is_converged());
540
541 sim.full_sync_round();
543
544 assert_eq!(sim.syncer(0).store().len(), 3);
546 assert_eq!(sim.syncer(1).store().len(), 3);
547
548 assert_eq!(sim.syncer(0).heads().len(), 2);
550 assert!(sim.is_converged());
551 }
552
553 #[test]
554 fn test_find_missing_ancestors() {
555 let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
556
557 let node_a = NodeBuilder::new()
559 .with_parent(genesis)
560 .with_payload(Payload::delta(vec![1]))
561 .with_timestamp(1)
562 .with_creator("r1")
563 .build();
564 let cid_a = store.put(node_a.clone()).unwrap();
565
566 let node_b = NodeBuilder::new()
567 .with_parent(cid_a)
568 .with_payload(Payload::delta(vec![2]))
569 .with_timestamp(2)
570 .with_creator("r1")
571 .build();
572 let cid_b = store.put(node_b.clone()).unwrap();
573
574 let node_c = NodeBuilder::new()
575 .with_parent(cid_b)
576 .with_payload(Payload::delta(vec![3]))
577 .with_timestamp(3)
578 .with_creator("r1")
579 .build();
580 let cid_c = node_c.cid;
581 store.put(node_c).unwrap();
582
583 let (store2, _) = MemoryDAGStore::with_genesis("r1");
585 let syncer = DAGSyncer::new(store2);
586
587 let missing = syncer.find_missing_ancestors(&[cid_c]);
589
590 assert!(missing.contains(&cid_c));
592 }
593
594 #[test]
595 fn test_sync_request_response() {
596 let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
597
598 let node = NodeBuilder::new()
599 .with_parent(genesis)
600 .with_payload(Payload::delta(vec![1]))
601 .with_timestamp(1)
602 .with_creator("r1")
603 .build();
604 let cid = store.put(node).unwrap();
605
606 let syncer = DAGSyncer::new(store);
607
608 let request = SyncRequest::want(vec![cid]);
610 let response = syncer.handle_request(&request);
611
612 assert_eq!(response.nodes.len(), 1);
613 assert_eq!(response.nodes[0].cid, cid);
614 }
615
616 #[test]
617 fn test_apply_response() {
618 let (_store1, genesis) = MemoryDAGStore::with_genesis("r1");
619
620 let node = NodeBuilder::new()
621 .with_parent(genesis)
622 .with_payload(Payload::delta(vec![1]))
623 .with_timestamp(1)
624 .with_creator("r1")
625 .build();
626 let cid = node.cid;
627
628 let (store2, _) = MemoryDAGStore::with_genesis("r1");
630 let mut syncer2 = DAGSyncer::new(store2);
631
632 let response = SyncResponse::with_nodes(vec![node]);
634 let stored = syncer2.apply_response(response).unwrap();
635
636 assert_eq!(stored.len(), 1);
637 assert_eq!(stored[0], cid);
638 assert!(syncer2.store().contains(&cid));
639 }
640
641 #[test]
642 fn test_is_synced_with() {
643 let mut sim = SyncSimulator::with_shared_genesis(2);
644 let genesis = sim.syncer(0).heads()[0];
645
646 assert!(sim.syncer(0).is_synced_with(&sim.syncer(1).heads()));
648
649 let node = NodeBuilder::new()
651 .with_parent(genesis)
652 .with_payload(Payload::delta(vec![1]))
653 .with_timestamp(1)
654 .with_creator("r0")
655 .build();
656 sim.syncer_mut(0).store_mut().put(node).unwrap();
657
658 assert!(!sim.syncer(1).is_synced_with(&sim.syncer(0).heads()));
660
661 sim.sync_pair(0, 1);
663 assert!(sim.syncer(1).is_synced_with(&sim.syncer(0).heads()));
664 }
665}