mdcs_merkle/
broadcaster.rs

1//! Gossip-based broadcasting for head dissemination.
2//!
3//! The Broadcaster announces new DAG heads to peers, triggering
4//! the pull-based sync process via DAGSyncer.
5
6use crate::hash::Hash;
7use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
8
9/// Configuration for the broadcaster.
10#[derive(Clone, Debug)]
11pub struct BroadcastConfig {
12    /// Number of peers to send each message to (fanout).
13    pub fanout: usize,
14
15    /// Maximum messages to buffer before dropping old ones.
16    pub buffer_size: usize,
17
18    /// Whether to deduplicate messages we've already seen.
19    pub deduplicate: bool,
20
21    /// Time-to-live: maximum hops a message can travel.
22    pub ttl: u8,
23}
24
25impl Default for BroadcastConfig {
26    fn default() -> Self {
27        BroadcastConfig {
28            fanout: 3,
29            buffer_size: 1000,
30            deduplicate: true,
31            ttl: 6,
32        }
33    }
34}
35
36/// A broadcast message containing head announcements.
37#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct BroadcastMessage {
39    /// Unique message ID (hash of contents).
40    pub id: Hash,
41
42    /// The replica that originated this message.
43    pub origin: String,
44
45    /// Current heads being announced.
46    pub heads: Vec<Hash>,
47
48    /// Remaining hops (time-to-live).
49    pub ttl: u8,
50
51    /// Logical timestamp when the message was created.
52    pub timestamp: u64,
53}
54
55impl BroadcastMessage {
56    /// Create a new broadcast message.
57    pub fn new(origin: impl Into<String>, heads: Vec<Hash>, ttl: u8, timestamp: u64) -> Self {
58        let origin = origin.into();
59
60        // Compute message ID from contents
61        let mut hasher = crate::hash::Hasher::new();
62        hasher.update(origin.as_bytes());
63        for head in &heads {
64            hasher.update(head.as_bytes());
65        }
66        hasher.update(&timestamp.to_le_bytes());
67        let id = hasher.finalize();
68
69        BroadcastMessage {
70            id,
71            origin,
72            heads,
73            ttl,
74            timestamp,
75        }
76    }
77
78    /// Create a forwarded copy with decremented TTL.
79    pub fn forward(&self) -> Option<Self> {
80        if self.ttl == 0 {
81            return None;
82        }
83
84        Some(BroadcastMessage {
85            id: self.id,
86            origin: self.origin.clone(),
87            heads: self.heads.clone(),
88            ttl: self.ttl - 1,
89            timestamp: self.timestamp,
90        })
91    }
92
93    /// Check if this message should still be forwarded.
94    pub fn is_alive(&self) -> bool {
95        self.ttl > 0
96    }
97}
98
99/// Events emitted by the broadcaster.
100#[derive(Clone, Debug)]
101pub enum BroadcastEvent {
102    /// Send this message to a peer.
103    Send {
104        peer: String,
105        message: BroadcastMessage,
106    },
107
108    /// New heads received from a peer.
109    HeadsReceived { from: String, heads: Vec<Hash> },
110
111    /// A message was dropped (buffer full or duplicate).
112    Dropped {
113        message_id: Hash,
114        reason: DropReason,
115    },
116}
117
118/// Reason a message was dropped.
119#[derive(Clone, Debug, PartialEq, Eq)]
120pub enum DropReason {
121    Duplicate,
122    BufferFull,
123    ExpiredTTL,
124}
125
126/// Gossip-based broadcaster for head dissemination.
127///
128/// The broadcaster maintains:
129/// - A set of known peers
130/// - A buffer of seen message IDs (for deduplication)
131/// - Pending outgoing messages
132pub struct Broadcaster {
133    /// Our replica ID.
134    replica_id: String,
135
136    /// Configuration.
137    config: BroadcastConfig,
138
139    /// Known peers (BTreeSet for deterministic iteration order).
140    peers: BTreeSet<String>,
141
142    /// Message IDs we've seen (for deduplication).
143    seen: HashSet<Hash>,
144
145    /// Order of seen messages (for LRU eviction).
146    seen_order: VecDeque<Hash>,
147
148    /// Current logical timestamp.
149    timestamp: u64,
150
151    /// Pending events to be processed.
152    pending_events: VecDeque<BroadcastEvent>,
153
154    /// Track which peers have which heads (optimization).
155    peer_heads: HashMap<String, HashSet<Hash>>,
156}
157
158impl Broadcaster {
159    /// Create a new broadcaster.
160    pub fn new(replica_id: impl Into<String>) -> Self {
161        Broadcaster {
162            replica_id: replica_id.into(),
163            config: BroadcastConfig::default(),
164            peers: BTreeSet::new(),
165            seen: HashSet::new(),
166            seen_order: VecDeque::new(),
167            timestamp: 0,
168            pending_events: VecDeque::new(),
169            peer_heads: HashMap::new(),
170        }
171    }
172
173    /// Create a broadcaster with custom configuration.
174    pub fn with_config(replica_id: impl Into<String>, config: BroadcastConfig) -> Self {
175        Broadcaster {
176            replica_id: replica_id.into(),
177            config,
178            peers: BTreeSet::new(),
179            seen: HashSet::new(),
180            seen_order: VecDeque::new(),
181            timestamp: 0,
182            pending_events: VecDeque::new(),
183            peer_heads: HashMap::new(),
184        }
185    }
186
187    /// Get our replica ID.
188    pub fn replica_id(&self) -> &str {
189        &self.replica_id
190    }
191
192    /// Add a peer.
193    pub fn add_peer(&mut self, peer: impl Into<String>) {
194        self.peers.insert(peer.into());
195    }
196
197    /// Remove a peer.
198    pub fn remove_peer(&mut self, peer: &str) {
199        self.peers.remove(peer);
200        self.peer_heads.remove(peer);
201    }
202
203    /// Get all known peers.
204    pub fn peers(&self) -> impl Iterator<Item = &String> {
205        self.peers.iter()
206    }
207
208    /// Broadcast new heads to peers.
209    pub fn broadcast(&mut self, heads: Vec<Hash>) {
210        self.timestamp += 1;
211
212        let message =
213            BroadcastMessage::new(&self.replica_id, heads, self.config.ttl, self.timestamp);
214
215        // Mark as seen
216        self.mark_seen(message.id);
217
218        // Select peers to send to
219        let targets = self.select_peers(self.config.fanout);
220
221        for peer in targets {
222            self.pending_events.push_back(BroadcastEvent::Send {
223                peer,
224                message: message.clone(),
225            });
226        }
227    }
228
229    /// Receive a message from a peer.
230    pub fn receive(&mut self, from: impl Into<String>, message: BroadcastMessage) {
231        let from = from.into();
232
233        // Check for duplicate
234        if self.config.deduplicate && self.seen.contains(&message.id) {
235            self.pending_events.push_back(BroadcastEvent::Dropped {
236                message_id: message.id,
237                reason: DropReason::Duplicate,
238            });
239            return;
240        }
241
242        // Check TTL
243        if !message.is_alive() {
244            self.pending_events.push_back(BroadcastEvent::Dropped {
245                message_id: message.id,
246                reason: DropReason::ExpiredTTL,
247            });
248            return;
249        }
250
251        // Mark as seen
252        self.mark_seen(message.id);
253
254        // Update peer's known heads
255        self.peer_heads
256            .entry(from.clone())
257            .or_default()
258            .extend(message.heads.iter().copied());
259
260        // Emit event for heads received
261        self.pending_events
262            .push_back(BroadcastEvent::HeadsReceived {
263                from: from.clone(),
264                heads: message.heads.clone(),
265            });
266
267        // Forward to other peers (excluding sender and origin)
268        if let Some(forwarded) = message.forward() {
269            let targets =
270                self.select_peers_excluding(self.config.fanout, &[&from, &message.origin]);
271
272            for peer in targets {
273                self.pending_events.push_back(BroadcastEvent::Send {
274                    peer,
275                    message: forwarded.clone(),
276                });
277            }
278        }
279    }
280
281    /// Get the next pending event.
282    pub fn poll_event(&mut self) -> Option<BroadcastEvent> {
283        self.pending_events.pop_front()
284    }
285
286    /// Check if there are pending events.
287    pub fn has_pending_events(&self) -> bool {
288        !self.pending_events.is_empty()
289    }
290
291    /// Get all pending events.
292    pub fn drain_events(&mut self) -> Vec<BroadcastEvent> {
293        self.pending_events.drain(..).collect()
294    }
295
296    /// Mark a message as seen.
297    fn mark_seen(&mut self, id: Hash) {
298        if self.seen.insert(id) {
299            self.seen_order.push_back(id);
300
301            // Evict old entries if buffer is full
302            while self.seen_order.len() > self.config.buffer_size {
303                if let Some(old_id) = self.seen_order.pop_front() {
304                    self.seen.remove(&old_id);
305                }
306            }
307        }
308    }
309
310    /// Select n random peers.
311    fn select_peers(&self, n: usize) -> Vec<String> {
312        // In a real implementation, this would use random selection
313        // For determinism in tests, we just take the first n
314        self.peers.iter().take(n).cloned().collect()
315    }
316
317    /// Select n random peers, excluding some.
318    fn select_peers_excluding(&self, n: usize, exclude: &[&str]) -> Vec<String> {
319        self.peers
320            .iter()
321            .filter(|p| !exclude.contains(&p.as_str()))
322            .take(n)
323            .cloned()
324            .collect()
325    }
326
327    /// Get statistics about the broadcaster.
328    pub fn stats(&self) -> BroadcastStats {
329        BroadcastStats {
330            peer_count: self.peers.len(),
331            seen_messages: self.seen.len(),
332            pending_events: self.pending_events.len(),
333            timestamp: self.timestamp,
334        }
335    }
336}
337
338/// Statistics about the broadcaster.
339#[derive(Clone, Debug)]
340pub struct BroadcastStats {
341    pub peer_count: usize,
342    pub seen_messages: usize,
343    pub pending_events: usize,
344    pub timestamp: u64,
345}
346
347/// Simulates a network of broadcasters for testing.
348pub struct BroadcastNetwork {
349    /// Broadcasters indexed by replica ID.
350    broadcasters: HashMap<String, Broadcaster>,
351
352    /// Message queue: (from, to, message).
353    message_queue: VecDeque<(String, String, BroadcastMessage)>,
354}
355
356impl BroadcastNetwork {
357    /// Create a fully connected network of n replicas.
358    pub fn fully_connected(n: usize) -> Self {
359        let mut broadcasters = HashMap::new();
360
361        // Create broadcasters
362        for i in 0..n {
363            let id = format!("replica_{}", i);
364            let mut broadcaster = Broadcaster::new(&id);
365
366            // Add all other replicas as peers
367            for j in 0..n {
368                if i != j {
369                    broadcaster.add_peer(format!("replica_{}", j));
370                }
371            }
372
373            broadcasters.insert(id, broadcaster);
374        }
375
376        BroadcastNetwork {
377            broadcasters,
378            message_queue: VecDeque::new(),
379        }
380    }
381
382    /// Broadcast heads from a replica.
383    pub fn broadcast(&mut self, from: &str, heads: Vec<Hash>) {
384        if let Some(broadcaster) = self.broadcasters.get_mut(from) {
385            broadcaster.broadcast(heads);
386            self.collect_send_events(from);
387        }
388    }
389
390    /// Collect send events and add to message queue.
391    /// Only extracts Send events, leaving HeadsReceived events in place.
392    fn collect_send_events(&mut self, from: &str) {
393        if let Some(broadcaster) = self.broadcasters.get_mut(from) {
394            let events: Vec<_> = broadcaster.drain_events();
395            for event in events {
396                match event {
397                    BroadcastEvent::Send { peer, message } => {
398                        self.message_queue
399                            .push_back((from.to_string(), peer, message));
400                    }
401                    // Put non-Send events back for later retrieval
402                    other => broadcaster.pending_events.push_back(other),
403                }
404            }
405        }
406    }
407
408    /// Deliver the next message in the queue.
409    pub fn deliver_one(&mut self) -> bool {
410        if let Some((from, to, message)) = self.message_queue.pop_front() {
411            if let Some(broadcaster) = self.broadcasters.get_mut(&to) {
412                broadcaster.receive(&from, message);
413                self.collect_send_events(&to);
414            }
415            true
416        } else {
417            false
418        }
419    }
420
421    /// Deliver all pending messages.
422    pub fn deliver_all(&mut self) {
423        while self.deliver_one() {}
424    }
425
426    /// Get a broadcaster by replica ID.
427    pub fn broadcaster(&self, id: &str) -> Option<&Broadcaster> {
428        self.broadcasters.get(id)
429    }
430
431    /// Get a mutable broadcaster by replica ID.
432    pub fn broadcaster_mut(&mut self, id: &str) -> Option<&mut Broadcaster> {
433        self.broadcasters.get_mut(id)
434    }
435
436    /// Get all received heads for a replica.
437    pub fn received_heads(&mut self, id: &str) -> Vec<Hash> {
438        let mut heads = Vec::new();
439
440        if let Some(broadcaster) = self.broadcasters.get_mut(id) {
441            for event in broadcaster.drain_events() {
442                if let BroadcastEvent::HeadsReceived { heads: h, .. } = event {
443                    heads.extend(h);
444                }
445            }
446        }
447
448        heads
449    }
450
451    /// Check how many messages are pending.
452    pub fn pending_messages(&self) -> usize {
453        self.message_queue.len()
454    }
455}
456
457#[cfg(test)]
458mod tests {
459    use super::*;
460    use crate::hash::Hasher;
461
462    #[test]
463    fn test_basic_broadcast() {
464        let mut network = BroadcastNetwork::fully_connected(3);
465
466        // Broadcast from replica_0
467        let head = Hasher::hash(b"test_head");
468        network.broadcast("replica_0", vec![head]);
469
470        // Should have messages queued for 2 peers
471        assert!(network.pending_messages() > 0);
472
473        // Deliver all
474        network.deliver_all();
475
476        // All replicas should have received the heads
477        let heads_1 = network.received_heads("replica_1");
478        let heads_2 = network.received_heads("replica_2");
479
480        assert!(heads_1.contains(&head) || heads_2.contains(&head));
481    }
482
483    #[test]
484    fn test_message_forwarding() {
485        let mut broadcaster = Broadcaster::new("origin");
486        broadcaster.add_peer("peer_1");
487        broadcaster.add_peer("peer_2");
488        broadcaster.add_peer("peer_3");
489
490        let head = Hasher::hash(b"test");
491        broadcaster.broadcast(vec![head]);
492
493        // Should have send events
494        let events = broadcaster.drain_events();
495        assert!(!events.is_empty());
496
497        for event in events {
498            if let BroadcastEvent::Send { message, .. } = event {
499                assert!(message.ttl <= broadcaster.config.ttl);
500                assert!(message.heads.contains(&head));
501            }
502        }
503    }
504
505    #[test]
506    fn test_deduplication() {
507        let mut broadcaster = Broadcaster::new("receiver");
508        broadcaster.add_peer("sender");
509
510        let head = Hasher::hash(b"test");
511        let message = BroadcastMessage::new("origin", vec![head], 5, 1);
512
513        // Receive twice
514        broadcaster.receive("sender", message.clone());
515        broadcaster.receive("sender", message.clone());
516
517        // Second should be dropped
518        let events = broadcaster.drain_events();
519        let dropped_count = events
520            .iter()
521            .filter(|e| {
522                matches!(
523                    e,
524                    BroadcastEvent::Dropped {
525                        reason: DropReason::Duplicate,
526                        ..
527                    }
528                )
529            })
530            .count();
531
532        assert_eq!(dropped_count, 1);
533    }
534
535    #[test]
536    fn test_ttl_expiry() {
537        let mut broadcaster = Broadcaster::new("receiver");
538
539        let head = Hasher::hash(b"test");
540        let message = BroadcastMessage::new("origin", vec![head], 0, 1);
541
542        broadcaster.receive("sender", message);
543
544        let events = broadcaster.drain_events();
545        let expired = events.iter().any(|e| {
546            matches!(
547                e,
548                BroadcastEvent::Dropped {
549                    reason: DropReason::ExpiredTTL,
550                    ..
551                }
552            )
553        });
554
555        assert!(expired);
556    }
557
558    #[test]
559    fn test_forward_decrements_ttl() {
560        let head = Hasher::hash(b"test");
561        let message = BroadcastMessage::new("origin", vec![head], 5, 1);
562
563        let forwarded = message.forward().unwrap();
564        assert_eq!(forwarded.ttl, 4);
565
566        // ID should be the same
567        assert_eq!(forwarded.id, message.id);
568    }
569
570    #[test]
571    fn test_buffer_eviction() {
572        let config = BroadcastConfig {
573            buffer_size: 2,
574            ..Default::default()
575        };
576        let mut broadcaster = Broadcaster::with_config("test", config);
577        broadcaster.add_peer("peer");
578
579        // Broadcast 3 messages
580        for i in 0..3 {
581            broadcaster.broadcast(vec![Hasher::hash(&[i])]);
582        }
583
584        // Only 2 should be in seen set
585        assert_eq!(broadcaster.seen.len(), 2);
586    }
587
588    #[test]
589    fn test_peer_management() {
590        let mut broadcaster = Broadcaster::new("test");
591
592        broadcaster.add_peer("peer_1");
593        broadcaster.add_peer("peer_2");
594        assert_eq!(broadcaster.peers().count(), 2);
595
596        broadcaster.remove_peer("peer_1");
597        assert_eq!(broadcaster.peers().count(), 1);
598    }
599
600    #[test]
601    fn test_network_convergence() {
602        let mut network = BroadcastNetwork::fully_connected(5);
603
604        // Each replica broadcasts different heads
605        for i in 0..5 {
606            let head = Hasher::hash(&[i as u8]);
607            network.broadcast(&format!("replica_{}", i), vec![head]);
608        }
609
610        // Deliver all messages
611        network.deliver_all();
612
613        // All replicas should have received heads from others
614        // (checking that the gossip propagated)
615        assert_eq!(network.pending_messages(), 0);
616    }
617}