1use crate::hash::Hash;
7use crate::node::MerkleNode;
8use serde::{Deserialize, Serialize};
9use std::collections::{HashMap, HashSet, VecDeque};
10use std::sync::RwLock;
11
12#[derive(Clone, Debug, PartialEq, Eq)]
14pub enum DAGError {
15 NotFound(Hash),
17
18 VerificationFailed(Hash),
20
21 MissingParents(Vec<Hash>),
23
24 Duplicate(Hash),
26}
27
28impl std::fmt::Display for DAGError {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 DAGError::NotFound(h) => write!(f, "Node not found: {}", h.short()),
32 DAGError::VerificationFailed(h) => write!(f, "Verification failed for: {}", h.short()),
33 DAGError::MissingParents(parents) => {
34 write!(
35 f,
36 "Missing parents: {:?}",
37 parents.iter().map(|h| h.short()).collect::<Vec<_>>()
38 )
39 }
40 DAGError::Duplicate(h) => write!(f, "Duplicate node: {}", h.short()),
41 }
42 }
43}
44
45impl std::error::Error for DAGError {}
46
47pub trait DAGStore {
49 fn get(&self, cid: &Hash) -> Option<&MerkleNode>;
51
52 fn put(&mut self, node: MerkleNode) -> Result<Hash, DAGError>;
57
58 fn put_unchecked(&mut self, node: MerkleNode) -> Result<Hash, DAGError>;
62
63 fn heads(&self) -> Vec<Hash>;
65
66 fn contains(&self, cid: &Hash) -> bool;
68
69 fn ancestors(&self, cid: &Hash) -> HashSet<Hash>;
71
72 fn children(&self, cid: &Hash) -> Vec<Hash>;
74
75 fn topological_order(&self) -> Vec<Hash>;
77
78 fn missing_nodes(&self) -> HashSet<Hash>;
80
81 fn len(&self) -> usize;
83
84 fn is_empty(&self) -> bool {
86 self.len() == 0
87 }
88}
89
90#[derive(Debug, Default, Serialize, Deserialize)]
92pub struct MemoryDAGStore {
93 nodes: HashMap<Hash, MerkleNode>,
95
96 heads: HashSet<Hash>,
98
99 children_index: HashMap<Hash, HashSet<Hash>>,
101
102 missing: HashSet<Hash>,
104
105 #[serde(skip)]
107 cached_topo_order: RwLock<Option<Vec<Hash>>>,
108}
109
110impl Clone for MemoryDAGStore {
111 fn clone(&self) -> Self {
112 let cached_topo_order = self
113 .cached_topo_order
114 .read()
115 .unwrap_or_else(|e| e.into_inner())
116 .clone();
117
118 Self {
119 nodes: self.nodes.clone(),
120 heads: self.heads.clone(),
121 children_index: self.children_index.clone(),
122 missing: self.missing.clone(),
123 cached_topo_order: RwLock::new(cached_topo_order),
124 }
125 }
126}
127
128impl MemoryDAGStore {
129 pub fn new() -> Self {
131 MemoryDAGStore {
132 nodes: HashMap::new(),
133 heads: HashSet::new(),
134 children_index: HashMap::new(),
135 missing: HashSet::new(),
136 cached_topo_order: RwLock::new(None),
137 }
138 }
139
140 pub fn with_genesis(creator: impl Into<String>) -> (Self, Hash) {
142 let mut store = Self::new();
143 let genesis = crate::node::NodeBuilder::genesis(creator);
144 let cid = store.put(genesis).expect("Genesis node should be valid");
145 (store, cid)
146 }
147
148 fn update_heads(&mut self, node: &MerkleNode) {
150 self.heads.insert(node.cid);
152
153 for parent in &node.parents {
155 self.heads.remove(parent);
156 }
157 }
158
159 fn update_children_index(&mut self, node: &MerkleNode) {
161 for parent in &node.parents {
162 self.children_index
163 .entry(*parent)
164 .or_default()
165 .insert(node.cid);
166 }
167 }
168
169 pub fn stats(&self) -> DAGStats {
171 let max_depth = self.compute_max_depth();
172 let branching = self.compute_branching_stats();
173
174 DAGStats {
175 total_nodes: self.nodes.len(),
176 head_count: self.heads.len(),
177 missing_count: self.missing.len(),
178 max_depth,
179 avg_branching: branching,
180 }
181 }
182
183 fn compute_max_depth(&self) -> usize {
185 let mut depths: HashMap<Hash, usize> = HashMap::new();
186
187 for cid in self.topological_order() {
188 if let Some(node) = self.nodes.get(&cid) {
189 let parent_depth = node
190 .parents
191 .iter()
192 .filter_map(|p| depths.get(p))
193 .max()
194 .copied()
195 .unwrap_or(0);
196 depths.insert(cid, parent_depth + 1);
197 }
198 }
199
200 depths.values().max().copied().unwrap_or(0)
201 }
202
203 fn compute_branching_stats(&self) -> f64 {
205 if self.children_index.is_empty() {
206 return 0.0;
207 }
208
209 let total_children: usize = self.children_index.values().map(|c| c.len()).sum();
210
211 total_children as f64 / self.children_index.len() as f64
212 }
213}
214
215impl DAGStore for MemoryDAGStore {
216 fn get(&self, cid: &Hash) -> Option<&MerkleNode> {
217 self.nodes.get(cid)
218 }
219
220 fn put(&mut self, node: MerkleNode) -> Result<Hash, DAGError> {
221 if !node.verify() {
223 return Err(DAGError::VerificationFailed(node.cid));
224 }
225
226 if self.nodes.contains_key(&node.cid) {
228 return Ok(node.cid);
229 }
230
231 if !node.is_genesis() {
233 let missing: Vec<Hash> = node
234 .parents
235 .iter()
236 .filter(|p| !self.nodes.contains_key(p))
237 .copied()
238 .collect();
239
240 if !missing.is_empty() {
241 return Err(DAGError::MissingParents(missing));
242 }
243 }
244
245 let cid = node.cid;
246
247 self.update_heads(&node);
249 self.update_children_index(&node);
250
251 self.missing.remove(&cid);
253
254 self.nodes.insert(cid, node);
256
257 self.cached_topo_order
259 .write()
260 .unwrap_or_else(|e| e.into_inner())
261 .take();
262
263 Ok(cid)
264 }
265
266 fn put_unchecked(&mut self, node: MerkleNode) -> Result<Hash, DAGError> {
267 if !node.verify() {
269 return Err(DAGError::VerificationFailed(node.cid));
270 }
271
272 if self.nodes.contains_key(&node.cid) {
274 return Ok(node.cid);
275 }
276
277 let cid = node.cid;
278
279 for parent in &node.parents {
281 if !self.nodes.contains_key(parent) {
282 self.missing.insert(*parent);
283 }
284 }
285
286 self.update_children_index(&node);
288
289 if !self.children_index.contains_key(&cid) {
292 self.heads.insert(cid);
293 }
294 for parent in &node.parents {
296 self.heads.remove(parent);
297 }
298
299 self.missing.remove(&cid);
301
302 self.nodes.insert(cid, node);
304
305 self.cached_topo_order
307 .write()
308 .unwrap_or_else(|e| e.into_inner())
309 .take();
310
311 Ok(cid)
312 }
313
314 fn heads(&self) -> Vec<Hash> {
315 let mut heads: Vec<_> = self.heads.iter().copied().collect();
316 heads.sort();
317 heads
318 }
319
320 fn contains(&self, cid: &Hash) -> bool {
321 self.nodes.contains_key(cid)
322 }
323
324 fn ancestors(&self, cid: &Hash) -> HashSet<Hash> {
325 let mut result = HashSet::new();
326 let mut queue = VecDeque::new();
327
328 if let Some(node) = self.nodes.get(cid) {
329 queue.extend(node.parents.iter().copied());
330 }
331
332 while let Some(current) = queue.pop_front() {
333 if result.insert(current) {
334 if let Some(node) = self.nodes.get(¤t) {
335 queue.extend(node.parents.iter().copied());
336 }
337 }
338 }
339
340 result
341 }
342
343 fn children(&self, cid: &Hash) -> Vec<Hash> {
344 self.children_index
345 .get(cid)
346 .map(|c| c.iter().copied().collect())
347 .unwrap_or_default()
348 }
349
350 fn topological_order(&self) -> Vec<Hash> {
351 {
353 let cached = self
354 .cached_topo_order
355 .read()
356 .unwrap_or_else(|e| e.into_inner());
357 if let Some(order) = cached.as_ref() {
358 return order.clone();
359 }
360 }
361
362 let mut in_degree: HashMap<Hash, usize> = HashMap::new();
364 let mut result = Vec::new();
365 let mut queue = VecDeque::new();
366
367 for (cid, node) in &self.nodes {
369 let degree = node
370 .parents
371 .iter()
372 .filter(|p| self.nodes.contains_key(p))
373 .count();
374 in_degree.insert(*cid, degree);
375
376 if degree == 0 {
377 queue.push_back(*cid);
378 }
379 }
380
381 while let Some(cid) = queue.pop_front() {
383 result.push(cid);
384
385 if let Some(children) = self.children_index.get(&cid) {
386 for child in children {
387 if let Some(degree) = in_degree.get_mut(child) {
388 *degree = degree.saturating_sub(1);
389 if *degree == 0 {
390 queue.push_back(*child);
391 }
392 }
393 }
394 }
395 }
396
397 self.cached_topo_order
399 .write()
400 .unwrap_or_else(|e| e.into_inner())
401 .replace(result.clone());
402 result
403 }
404
405 fn missing_nodes(&self) -> HashSet<Hash> {
406 self.missing.clone()
407 }
408
409 fn len(&self) -> usize {
410 self.nodes.len()
411 }
412}
413
414#[derive(Clone, Debug)]
416pub struct DAGStats {
417 pub total_nodes: usize,
418 pub head_count: usize,
419 pub missing_count: usize,
420 pub max_depth: usize,
421 pub avg_branching: f64,
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::node::{NodeBuilder, Payload};
428
429 #[test]
430 fn test_memory_store_is_send_sync() {
431 fn assert_send_sync<T: Send + Sync>() {}
432 assert_send_sync::<MemoryDAGStore>();
433 }
434
435 #[test]
436 fn test_genesis_store() {
437 let (store, genesis_cid) = MemoryDAGStore::with_genesis("replica_1");
438
439 assert_eq!(store.len(), 1);
440 assert!(store.contains(&genesis_cid));
441 assert_eq!(store.heads(), vec![genesis_cid]);
442 }
443
444 #[test]
445 fn test_linear_chain() {
446 let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
447
448 let node1 = NodeBuilder::new()
449 .with_parent(genesis)
450 .with_payload(Payload::delta(vec![1]))
451 .with_timestamp(1)
452 .with_creator("r1")
453 .build();
454 let cid1 = store.put(node1).unwrap();
455
456 let node2 = NodeBuilder::new()
457 .with_parent(cid1)
458 .with_payload(Payload::delta(vec![2]))
459 .with_timestamp(2)
460 .with_creator("r1")
461 .build();
462 let cid2 = store.put(node2).unwrap();
463
464 assert_eq!(store.len(), 3);
465 assert_eq!(store.heads(), vec![cid2]);
466 assert_eq!(store.ancestors(&cid2), HashSet::from([genesis, cid1]));
467 }
468
469 #[test]
470 fn test_concurrent_branches() {
471 let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
472
473 let branch_a = NodeBuilder::new()
475 .with_parent(genesis)
476 .with_payload(Payload::delta(b"a".to_vec()))
477 .with_timestamp(1)
478 .with_creator("r1")
479 .build();
480 let cid_a = store.put(branch_a).unwrap();
481
482 let branch_b = NodeBuilder::new()
483 .with_parent(genesis)
484 .with_payload(Payload::delta(b"b".to_vec()))
485 .with_timestamp(1)
486 .with_creator("r2")
487 .build();
488 let cid_b = store.put(branch_b).unwrap();
489
490 let heads = store.heads();
492 assert_eq!(heads.len(), 2);
493 assert!(heads.contains(&cid_a));
494 assert!(heads.contains(&cid_b));
495 }
496
497 #[test]
498 fn test_merge_node() {
499 let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
500
501 let branch_a = NodeBuilder::new()
502 .with_parent(genesis)
503 .with_payload(Payload::delta(b"a".to_vec()))
504 .with_timestamp(1)
505 .with_creator("r1")
506 .build();
507 let cid_a = store.put(branch_a).unwrap();
508
509 let branch_b = NodeBuilder::new()
510 .with_parent(genesis)
511 .with_payload(Payload::delta(b"b".to_vec()))
512 .with_timestamp(1)
513 .with_creator("r2")
514 .build();
515 let cid_b = store.put(branch_b).unwrap();
516
517 let merge = NodeBuilder::new()
519 .with_parents(vec![cid_a, cid_b])
520 .with_payload(Payload::delta(b"merge".to_vec()))
521 .with_timestamp(2)
522 .with_creator("r1")
523 .build();
524 let merge_cid = store.put(merge).unwrap();
525
526 assert_eq!(store.heads(), vec![merge_cid]);
528
529 let ancestors = store.ancestors(&merge_cid);
531 assert!(ancestors.contains(&cid_a));
532 assert!(ancestors.contains(&cid_b));
533 assert!(ancestors.contains(&genesis));
534 }
535
536 #[test]
537 fn test_missing_parents_error() {
538 let mut store = MemoryDAGStore::new();
539
540 let fake_parent = crate::hash::Hasher::hash(b"fake");
541
542 let node = NodeBuilder::new()
543 .with_parent(fake_parent)
544 .with_payload(Payload::delta(vec![1]))
545 .with_timestamp(1)
546 .with_creator("r1")
547 .build();
548
549 let result = store.put(node);
550 assert!(matches!(result, Err(DAGError::MissingParents(_))));
551 }
552
553 #[test]
554 fn test_put_unchecked() {
555 let mut store = MemoryDAGStore::new();
556
557 let fake_parent = crate::hash::Hasher::hash(b"fake");
558
559 let node = NodeBuilder::new()
560 .with_parent(fake_parent)
561 .with_payload(Payload::delta(vec![1]))
562 .with_timestamp(1)
563 .with_creator("r1")
564 .build();
565
566 let cid = store.put_unchecked(node).unwrap();
568
569 assert!(store.missing_nodes().contains(&fake_parent));
571 assert!(store.contains(&cid));
572 }
573
574 #[test]
575 fn test_topological_order() {
576 let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
577
578 let node1 = NodeBuilder::new()
579 .with_parent(genesis)
580 .with_payload(Payload::delta(vec![1]))
581 .with_timestamp(1)
582 .with_creator("r1")
583 .build();
584 let cid1 = store.put(node1).unwrap();
585
586 let node2 = NodeBuilder::new()
587 .with_parent(cid1)
588 .with_payload(Payload::delta(vec![2]))
589 .with_timestamp(2)
590 .with_creator("r1")
591 .build();
592 let cid2 = store.put(node2).unwrap();
593
594 let order = store.topological_order();
595
596 let genesis_pos = order.iter().position(|&c| c == genesis).unwrap();
598 let cid1_pos = order.iter().position(|&c| c == cid1).unwrap();
599 let cid2_pos = order.iter().position(|&c| c == cid2).unwrap();
600
601 assert!(genesis_pos < cid1_pos);
602 assert!(cid1_pos < cid2_pos);
603 }
604
605 #[test]
606 fn test_children_index() {
607 let (mut store, genesis) = MemoryDAGStore::with_genesis("r1");
608
609 let child1 = NodeBuilder::new()
610 .with_parent(genesis)
611 .with_payload(Payload::delta(vec![1]))
612 .with_timestamp(1)
613 .with_creator("r1")
614 .build();
615 let cid1 = store.put(child1).unwrap();
616
617 let child2 = NodeBuilder::new()
618 .with_parent(genesis)
619 .with_payload(Payload::delta(vec![2]))
620 .with_timestamp(1)
621 .with_creator("r2")
622 .build();
623 let cid2 = store.put(child2).unwrap();
624
625 let children = store.children(&genesis);
626 assert_eq!(children.len(), 2);
627 assert!(children.contains(&cid1));
628 assert!(children.contains(&cid2));
629 }
630
631 #[test]
632 fn test_dag_stats() {
633 let (mut store, _genesis) = MemoryDAGStore::with_genesis("r1");
634
635 for i in 0..5 {
636 let last_head = store.heads()[0];
637 let node = NodeBuilder::new()
638 .with_parent(last_head)
639 .with_payload(Payload::delta(vec![i]))
640 .with_timestamp(i as u64 + 1)
641 .with_creator("r1")
642 .build();
643 store.put(node).unwrap();
644 }
645
646 let stats = store.stats();
647 assert_eq!(stats.total_nodes, 6);
648 assert_eq!(stats.head_count, 1);
649 assert_eq!(stats.max_depth, 6);
650 }
651}