1use crate::hash::Hash;
7use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
8
9#[derive(Clone, Debug)]
11pub struct BroadcastConfig {
12 pub fanout: usize,
14
15 pub buffer_size: usize,
17
18 pub deduplicate: bool,
20
21 pub ttl: u8,
23}
24
25impl Default for BroadcastConfig {
26 fn default() -> Self {
27 BroadcastConfig {
28 fanout: 3,
29 buffer_size: 1000,
30 deduplicate: true,
31 ttl: 6,
32 }
33 }
34}
35
36#[derive(Clone, Debug, PartialEq, Eq)]
38pub struct BroadcastMessage {
39 pub id: Hash,
41
42 pub origin: String,
44
45 pub heads: Vec<Hash>,
47
48 pub ttl: u8,
50
51 pub timestamp: u64,
53}
54
55impl BroadcastMessage {
56 pub fn new(origin: impl Into<String>, heads: Vec<Hash>, ttl: u8, timestamp: u64) -> Self {
58 let origin = origin.into();
59
60 let mut hasher = crate::hash::Hasher::new();
62 hasher.update(origin.as_bytes());
63 for head in &heads {
64 hasher.update(head.as_bytes());
65 }
66 hasher.update(×tamp.to_le_bytes());
67 let id = hasher.finalize();
68
69 BroadcastMessage {
70 id,
71 origin,
72 heads,
73 ttl,
74 timestamp,
75 }
76 }
77
78 pub fn forward(&self) -> Option<Self> {
80 if self.ttl == 0 {
81 return None;
82 }
83
84 Some(BroadcastMessage {
85 id: self.id,
86 origin: self.origin.clone(),
87 heads: self.heads.clone(),
88 ttl: self.ttl - 1,
89 timestamp: self.timestamp,
90 })
91 }
92
93 pub fn is_alive(&self) -> bool {
95 self.ttl > 0
96 }
97}
98
99#[derive(Clone, Debug)]
101pub enum BroadcastEvent {
102 Send {
104 peer: String,
105 message: BroadcastMessage,
106 },
107
108 HeadsReceived { from: String, heads: Vec<Hash> },
110
111 Dropped {
113 message_id: Hash,
114 reason: DropReason,
115 },
116}
117
118#[derive(Clone, Debug, PartialEq, Eq)]
120pub enum DropReason {
121 Duplicate,
122 BufferFull,
123 ExpiredTTL,
124}
125
126pub struct Broadcaster {
133 replica_id: String,
135
136 config: BroadcastConfig,
138
139 peers: BTreeSet<String>,
141
142 seen: HashSet<Hash>,
144
145 seen_order: VecDeque<Hash>,
147
148 timestamp: u64,
150
151 pending_events: VecDeque<BroadcastEvent>,
153
154 peer_heads: HashMap<String, HashSet<Hash>>,
156}
157
158impl Broadcaster {
159 pub fn new(replica_id: impl Into<String>) -> Self {
161 Broadcaster {
162 replica_id: replica_id.into(),
163 config: BroadcastConfig::default(),
164 peers: BTreeSet::new(),
165 seen: HashSet::new(),
166 seen_order: VecDeque::new(),
167 timestamp: 0,
168 pending_events: VecDeque::new(),
169 peer_heads: HashMap::new(),
170 }
171 }
172
173 pub fn with_config(replica_id: impl Into<String>, config: BroadcastConfig) -> Self {
175 Broadcaster {
176 replica_id: replica_id.into(),
177 config,
178 peers: BTreeSet::new(),
179 seen: HashSet::new(),
180 seen_order: VecDeque::new(),
181 timestamp: 0,
182 pending_events: VecDeque::new(),
183 peer_heads: HashMap::new(),
184 }
185 }
186
187 pub fn replica_id(&self) -> &str {
189 &self.replica_id
190 }
191
192 pub fn add_peer(&mut self, peer: impl Into<String>) {
194 self.peers.insert(peer.into());
195 }
196
197 pub fn remove_peer(&mut self, peer: &str) {
199 self.peers.remove(peer);
200 self.peer_heads.remove(peer);
201 }
202
203 pub fn peers(&self) -> impl Iterator<Item = &String> {
205 self.peers.iter()
206 }
207
208 pub fn broadcast(&mut self, heads: Vec<Hash>) {
210 self.timestamp += 1;
211
212 let message =
213 BroadcastMessage::new(&self.replica_id, heads, self.config.ttl, self.timestamp);
214
215 self.mark_seen(message.id);
217
218 let targets = self.select_peers(self.config.fanout);
220
221 for peer in targets {
222 self.pending_events.push_back(BroadcastEvent::Send {
223 peer,
224 message: message.clone(),
225 });
226 }
227 }
228
229 pub fn receive(&mut self, from: impl Into<String>, message: BroadcastMessage) {
231 let from = from.into();
232
233 if self.config.deduplicate && self.seen.contains(&message.id) {
235 self.pending_events.push_back(BroadcastEvent::Dropped {
236 message_id: message.id,
237 reason: DropReason::Duplicate,
238 });
239 return;
240 }
241
242 if !message.is_alive() {
244 self.pending_events.push_back(BroadcastEvent::Dropped {
245 message_id: message.id,
246 reason: DropReason::ExpiredTTL,
247 });
248 return;
249 }
250
251 self.mark_seen(message.id);
253
254 self.peer_heads
256 .entry(from.clone())
257 .or_default()
258 .extend(message.heads.iter().copied());
259
260 self.pending_events
262 .push_back(BroadcastEvent::HeadsReceived {
263 from: from.clone(),
264 heads: message.heads.clone(),
265 });
266
267 if let Some(forwarded) = message.forward() {
269 let targets =
270 self.select_peers_excluding(self.config.fanout, &[&from, &message.origin]);
271
272 for peer in targets {
273 self.pending_events.push_back(BroadcastEvent::Send {
274 peer,
275 message: forwarded.clone(),
276 });
277 }
278 }
279 }
280
281 pub fn poll_event(&mut self) -> Option<BroadcastEvent> {
283 self.pending_events.pop_front()
284 }
285
286 pub fn has_pending_events(&self) -> bool {
288 !self.pending_events.is_empty()
289 }
290
291 pub fn drain_events(&mut self) -> Vec<BroadcastEvent> {
293 self.pending_events.drain(..).collect()
294 }
295
296 fn mark_seen(&mut self, id: Hash) {
298 if self.seen.insert(id) {
299 self.seen_order.push_back(id);
300
301 while self.seen_order.len() > self.config.buffer_size {
303 if let Some(old_id) = self.seen_order.pop_front() {
304 self.seen.remove(&old_id);
305 }
306 }
307 }
308 }
309
310 fn select_peers(&self, n: usize) -> Vec<String> {
312 self.peers.iter().take(n).cloned().collect()
315 }
316
317 fn select_peers_excluding(&self, n: usize, exclude: &[&str]) -> Vec<String> {
319 self.peers
320 .iter()
321 .filter(|p| !exclude.contains(&p.as_str()))
322 .take(n)
323 .cloned()
324 .collect()
325 }
326
327 pub fn stats(&self) -> BroadcastStats {
329 BroadcastStats {
330 peer_count: self.peers.len(),
331 seen_messages: self.seen.len(),
332 pending_events: self.pending_events.len(),
333 timestamp: self.timestamp,
334 }
335 }
336}
337
338#[derive(Clone, Debug)]
340pub struct BroadcastStats {
341 pub peer_count: usize,
342 pub seen_messages: usize,
343 pub pending_events: usize,
344 pub timestamp: u64,
345}
346
347pub struct BroadcastNetwork {
349 broadcasters: HashMap<String, Broadcaster>,
351
352 message_queue: VecDeque<(String, String, BroadcastMessage)>,
354}
355
356impl BroadcastNetwork {
357 pub fn fully_connected(n: usize) -> Self {
359 let mut broadcasters = HashMap::new();
360
361 for i in 0..n {
363 let id = format!("replica_{}", i);
364 let mut broadcaster = Broadcaster::new(&id);
365
366 for j in 0..n {
368 if i != j {
369 broadcaster.add_peer(format!("replica_{}", j));
370 }
371 }
372
373 broadcasters.insert(id, broadcaster);
374 }
375
376 BroadcastNetwork {
377 broadcasters,
378 message_queue: VecDeque::new(),
379 }
380 }
381
382 pub fn broadcast(&mut self, from: &str, heads: Vec<Hash>) {
384 if let Some(broadcaster) = self.broadcasters.get_mut(from) {
385 broadcaster.broadcast(heads);
386 self.collect_send_events(from);
387 }
388 }
389
390 fn collect_send_events(&mut self, from: &str) {
393 if let Some(broadcaster) = self.broadcasters.get_mut(from) {
394 let events: Vec<_> = broadcaster.drain_events();
395 for event in events {
396 match event {
397 BroadcastEvent::Send { peer, message } => {
398 self.message_queue
399 .push_back((from.to_string(), peer, message));
400 }
401 other => broadcaster.pending_events.push_back(other),
403 }
404 }
405 }
406 }
407
408 pub fn deliver_one(&mut self) -> bool {
410 if let Some((from, to, message)) = self.message_queue.pop_front() {
411 if let Some(broadcaster) = self.broadcasters.get_mut(&to) {
412 broadcaster.receive(&from, message);
413 self.collect_send_events(&to);
414 }
415 true
416 } else {
417 false
418 }
419 }
420
421 pub fn deliver_all(&mut self) {
423 while self.deliver_one() {}
424 }
425
426 pub fn broadcaster(&self, id: &str) -> Option<&Broadcaster> {
428 self.broadcasters.get(id)
429 }
430
431 pub fn broadcaster_mut(&mut self, id: &str) -> Option<&mut Broadcaster> {
433 self.broadcasters.get_mut(id)
434 }
435
436 pub fn received_heads(&mut self, id: &str) -> Vec<Hash> {
438 let mut heads = Vec::new();
439
440 if let Some(broadcaster) = self.broadcasters.get_mut(id) {
441 for event in broadcaster.drain_events() {
442 if let BroadcastEvent::HeadsReceived { heads: h, .. } = event {
443 heads.extend(h);
444 }
445 }
446 }
447
448 heads
449 }
450
451 pub fn pending_messages(&self) -> usize {
453 self.message_queue.len()
454 }
455}
456
457#[cfg(test)]
458mod tests {
459 use super::*;
460 use crate::hash::Hasher;
461
462 #[test]
463 fn test_basic_broadcast() {
464 let mut network = BroadcastNetwork::fully_connected(3);
465
466 let head = Hasher::hash(b"test_head");
468 network.broadcast("replica_0", vec![head]);
469
470 assert!(network.pending_messages() > 0);
472
473 network.deliver_all();
475
476 let heads_1 = network.received_heads("replica_1");
478 let heads_2 = network.received_heads("replica_2");
479
480 assert!(heads_1.contains(&head) || heads_2.contains(&head));
481 }
482
483 #[test]
484 fn test_message_forwarding() {
485 let mut broadcaster = Broadcaster::new("origin");
486 broadcaster.add_peer("peer_1");
487 broadcaster.add_peer("peer_2");
488 broadcaster.add_peer("peer_3");
489
490 let head = Hasher::hash(b"test");
491 broadcaster.broadcast(vec![head]);
492
493 let events = broadcaster.drain_events();
495 assert!(!events.is_empty());
496
497 for event in events {
498 if let BroadcastEvent::Send { message, .. } = event {
499 assert!(message.ttl <= broadcaster.config.ttl);
500 assert!(message.heads.contains(&head));
501 }
502 }
503 }
504
505 #[test]
506 fn test_deduplication() {
507 let mut broadcaster = Broadcaster::new("receiver");
508 broadcaster.add_peer("sender");
509
510 let head = Hasher::hash(b"test");
511 let message = BroadcastMessage::new("origin", vec![head], 5, 1);
512
513 broadcaster.receive("sender", message.clone());
515 broadcaster.receive("sender", message.clone());
516
517 let events = broadcaster.drain_events();
519 let dropped_count = events
520 .iter()
521 .filter(|e| {
522 matches!(
523 e,
524 BroadcastEvent::Dropped {
525 reason: DropReason::Duplicate,
526 ..
527 }
528 )
529 })
530 .count();
531
532 assert_eq!(dropped_count, 1);
533 }
534
535 #[test]
536 fn test_ttl_expiry() {
537 let mut broadcaster = Broadcaster::new("receiver");
538
539 let head = Hasher::hash(b"test");
540 let message = BroadcastMessage::new("origin", vec![head], 0, 1);
541
542 broadcaster.receive("sender", message);
543
544 let events = broadcaster.drain_events();
545 let expired = events.iter().any(|e| {
546 matches!(
547 e,
548 BroadcastEvent::Dropped {
549 reason: DropReason::ExpiredTTL,
550 ..
551 }
552 )
553 });
554
555 assert!(expired);
556 }
557
558 #[test]
559 fn test_forward_decrements_ttl() {
560 let head = Hasher::hash(b"test");
561 let message = BroadcastMessage::new("origin", vec![head], 5, 1);
562
563 let forwarded = message.forward().unwrap();
564 assert_eq!(forwarded.ttl, 4);
565
566 assert_eq!(forwarded.id, message.id);
568 }
569
570 #[test]
571 fn test_buffer_eviction() {
572 let config = BroadcastConfig {
573 buffer_size: 2,
574 ..Default::default()
575 };
576 let mut broadcaster = Broadcaster::with_config("test", config);
577 broadcaster.add_peer("peer");
578
579 for i in 0..3 {
581 broadcaster.broadcast(vec![Hasher::hash(&[i])]);
582 }
583
584 assert_eq!(broadcaster.seen.len(), 2);
586 }
587
588 #[test]
589 fn test_peer_management() {
590 let mut broadcaster = Broadcaster::new("test");
591
592 broadcaster.add_peer("peer_1");
593 broadcaster.add_peer("peer_2");
594 assert_eq!(broadcaster.peers().count(), 2);
595
596 broadcaster.remove_peer("peer_1");
597 assert_eq!(broadcaster.peers().count(), 1);
598 }
599
600 #[test]
601 fn test_network_convergence() {
602 let mut network = BroadcastNetwork::fully_connected(5);
603
604 for i in 0..5 {
606 let head = Hasher::hash(&[i as u8]);
607 network.broadcast(&format!("replica_{}", i), vec![head]);
608 }
609
610 network.deliver_all();
612
613 assert_eq!(network.pending_messages(), 0);
616 }
617}