1use crate::buffer::{DeltaReplica, ReplicaId, SeqNo};
27use mdcs_core::lattice::Lattice;
28use std::collections::VecDeque;
29
30#[derive(Debug, Clone)]
32pub enum AntiEntropyMessage<D> {
33 Delta {
35 from: ReplicaId,
36 to: ReplicaId,
37 delta: D,
38 seq: SeqNo,
39 },
40 Ack {
42 from: ReplicaId,
43 to: ReplicaId,
44 seq: SeqNo,
45 },
46}
47
48#[derive(Debug)]
50pub struct NetworkSimulator<D> {
51 in_flight: VecDeque<AntiEntropyMessage<D>>,
53 lost: Vec<AntiEntropyMessage<D>>,
55 config: NetworkConfig,
57 rng_state: u64,
59}
60
61#[derive(Debug, Clone)]
63pub struct NetworkConfig {
64 pub loss_rate: f64,
66 pub dup_rate: f64,
68 pub reorder_rate: f64,
70}
71
72impl Default for NetworkConfig {
73 fn default() -> Self {
74 Self {
75 loss_rate: 0.0,
76 dup_rate: 0.0,
77 reorder_rate: 0.0,
78 }
79 }
80}
81
82impl NetworkConfig {
83 pub fn lossy(loss_rate: f64) -> Self {
85 Self {
86 loss_rate,
87 ..Default::default()
88 }
89 }
90
91 pub fn with_dups(dup_rate: f64) -> Self {
93 Self {
94 dup_rate,
95 ..Default::default()
96 }
97 }
98
99 pub fn chaotic() -> Self {
101 Self {
102 loss_rate: 0.1,
103 dup_rate: 0.2,
104 reorder_rate: 0.3,
105 }
106 }
107}
108
109impl<D: Clone> NetworkSimulator<D> {
110 pub fn new(config: NetworkConfig) -> Self {
111 Self {
112 in_flight: VecDeque::new(),
113 lost: Vec::new(),
114 config,
115 rng_state: 12345,
116 }
117 }
118
119 fn next_random(&mut self) -> f64 {
121 self.rng_state = self.rng_state.wrapping_mul(1103515245).wrapping_add(12345);
122 ((self.rng_state >> 16) & 0x7fff) as f64 / 32768.0
123 }
124
125 pub fn send(&mut self, msg: AntiEntropyMessage<D>) {
127 if self.next_random() < self.config.loss_rate {
129 self.lost.push(msg);
130 return;
131 }
132
133 if self.next_random() < self.config.dup_rate {
135 self.in_flight.push_back(msg.clone());
136 }
137
138 if self.next_random() < self.config.reorder_rate && !self.in_flight.is_empty() {
140 let pos = (self.next_random() * self.in_flight.len() as f64) as usize;
142 let pos = pos.min(self.in_flight.len());
143 self.in_flight.push_back(msg);
145 if pos < self.in_flight.len() - 1 {
146 self.in_flight.swap(pos, self.in_flight.len() - 1);
148 }
149 } else {
150 self.in_flight.push_back(msg);
151 }
152 }
153
154 pub fn receive(&mut self) -> Option<AntiEntropyMessage<D>> {
156 self.in_flight.pop_front()
157 }
158
159 pub fn retransmit_lost(&mut self) {
161 for msg in self.lost.drain(..) {
162 self.in_flight.push_back(msg);
163 }
164 }
165
166 pub fn is_empty(&self) -> bool {
168 self.in_flight.is_empty()
169 }
170
171 pub fn in_flight_count(&self) -> usize {
173 self.in_flight.len()
174 }
175
176 pub fn lost_count(&self) -> usize {
178 self.lost.len()
179 }
180}
181
182#[derive(Debug)]
184pub struct AntiEntropyCluster<S: Lattice + Clone> {
185 replicas: Vec<DeltaReplica<S, S>>,
187 network: NetworkSimulator<S>,
189}
190
191impl<S: Lattice + Clone> AntiEntropyCluster<S> {
192 pub fn new(n: usize, config: NetworkConfig) -> Self {
194 let mut replicas = Vec::with_capacity(n);
195
196 for i in 0..n {
198 let mut replica = DeltaReplica::new(format!("replica_{}", i));
199 for j in 0..n {
201 if i != j {
202 replica.register_peer(format!("replica_{}", j));
203 }
204 }
205 replicas.push(replica);
206 }
207
208 Self {
209 replicas,
210 network: NetworkSimulator::new(config),
211 }
212 }
213
214 pub fn replica(&self, idx: usize) -> &DeltaReplica<S, S> {
216 &self.replicas[idx]
217 }
218
219 pub fn replica_mut(&mut self, idx: usize) -> &mut DeltaReplica<S, S> {
221 &mut self.replicas[idx]
222 }
223
224 pub fn mutate<F>(&mut self, replica_idx: usize, mutator: F) -> S
226 where
227 F: FnOnce(&S) -> S,
228 {
229 self.replicas[replica_idx].mutate(mutator)
230 }
231
232 pub fn initiate_sync(&mut self, from_idx: usize, to_idx: usize) {
234 let to_id = self.replicas[to_idx].id.clone();
235 if let Some((delta, seq)) = self.replicas[from_idx].prepare_sync(&to_id) {
236 let msg = AntiEntropyMessage::Delta {
237 from: self.replicas[from_idx].id.clone(),
238 to: to_id.clone(),
239 delta,
240 seq,
241 };
242 self.network.send(msg);
243 }
244 }
245
246 pub fn process_one(&mut self) -> bool {
248 if let Some(msg) = self.network.receive() {
249 match msg {
250 AntiEntropyMessage::Delta {
251 from,
252 to,
253 delta,
254 seq,
255 } => {
256 for replica in &mut self.replicas {
258 if replica.id == to {
259 replica.receive_delta(&delta);
260 let ack = AntiEntropyMessage::Ack {
262 from: replica.id.clone(),
263 to: from.clone(),
264 seq,
265 };
266 self.network.send(ack);
267 break;
268 }
269 }
270 }
271 AntiEntropyMessage::Ack { from, to, seq } => {
272 for replica in &mut self.replicas {
274 if replica.id == to {
275 replica.process_ack(&from, seq);
276 break;
277 }
278 }
279 }
280 }
281 true
282 } else {
283 false
284 }
285 }
286
287 pub fn drain_network(&mut self) {
289 while self.process_one() {}
290 }
291
292 pub fn broadcast(&mut self, from_idx: usize) {
294 let n = self.replicas.len();
295 for to_idx in 0..n {
296 if from_idx != to_idx {
297 self.initiate_sync(from_idx, to_idx);
298 }
299 }
300 }
301
302 pub fn full_sync_round(&mut self) {
304 let n = self.replicas.len();
305 for from_idx in 0..n {
306 for to_idx in 0..n {
307 if from_idx != to_idx {
308 self.initiate_sync(from_idx, to_idx);
309 }
310 }
311 }
312 self.drain_network();
313 }
314
315 pub fn is_converged(&self) -> bool {
317 if self.replicas.len() < 2 {
318 return true;
319 }
320
321 let first = self.replicas[0].state();
322 self.replicas.iter().skip(1).all(|r| r.state() == first)
323 }
324
325 pub fn retransmit_and_process(&mut self) {
327 self.network.retransmit_lost();
328 self.drain_network();
329 }
330
331 pub fn len(&self) -> usize {
333 self.replicas.len()
334 }
335
336 pub fn is_empty(&self) -> bool {
338 self.replicas.is_empty()
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use mdcs_core::gset::GSet;
346
347 #[test]
348 fn test_network_simulator_basic() {
349 let mut net: NetworkSimulator<i32> = NetworkSimulator::new(NetworkConfig::default());
350
351 net.send(AntiEntropyMessage::Delta {
352 from: "r1".to_string(),
353 to: "".to_string(),
354 delta: 42,
355 seq: 1,
356 });
357
358 assert_eq!(net.in_flight_count(), 1);
359
360 let msg = net.receive().unwrap();
361 match msg {
362 AntiEntropyMessage::Delta { delta, .. } => assert_eq!(delta, 42),
363 _ => panic!("Expected delta message"),
364 }
365 }
366
367 #[test]
368 fn test_cluster_basic_convergence() {
369 let mut cluster: AntiEntropyCluster<GSet<i32>> =
370 AntiEntropyCluster::new(3, NetworkConfig::default());
371
372 cluster.mutate(0, |_| {
374 let mut d = GSet::new();
375 d.insert(1);
376 d
377 });
378
379 cluster.mutate(1, |_| {
381 let mut d = GSet::new();
382 d.insert(2);
383 d
384 });
385
386 assert!(!cluster.is_converged());
388
389 cluster.full_sync_round();
391
392 assert!(cluster.is_converged());
394
395 for i in 0..3 {
397 assert!(cluster.replica(i).state().contains(&1));
398 assert!(cluster.replica(i).state().contains(&2));
399 }
400 }
401
402 #[test]
403 fn test_convergence_under_loss() {
404 let mut cluster: AntiEntropyCluster<GSet<i32>> =
405 AntiEntropyCluster::new(3, NetworkConfig::lossy(0.5));
406
407 for i in 0..3 {
409 let val = (i + 1) as i32;
410 cluster.mutate(i, move |_| {
411 let mut d = GSet::new();
412 d.insert(val);
413 d
414 });
415 }
416
417 for _ in 0..10 {
419 cluster.full_sync_round();
420 cluster.retransmit_and_process();
421 }
422
423 assert!(cluster.is_converged());
425
426 for i in 0..3 {
428 for val in 1..=3 {
429 assert!(cluster.replica(i).state().contains(&val));
430 }
431 }
432 }
433
434 #[test]
435 fn test_convergence_with_duplicates() {
436 let mut cluster: AntiEntropyCluster<GSet<i32>> =
437 AntiEntropyCluster::new(2, NetworkConfig::with_dups(0.5));
438
439 cluster.mutate(0, |_| {
440 let mut d = GSet::new();
441 d.insert(1);
442 d
443 });
444
445 cluster.mutate(1, |_| {
446 let mut d = GSet::new();
447 d.insert(2);
448 d
449 });
450
451 for _ in 0..5 {
453 cluster.full_sync_round();
454 }
455
456 assert!(cluster.is_converged());
457
458 assert!(cluster.replica(0).state().contains(&1));
460 assert!(cluster.replica(0).state().contains(&2));
461 }
462
463 #[test]
464 fn test_convergence_chaotic_network() {
465 let mut cluster: AntiEntropyCluster<GSet<i32>> =
466 AntiEntropyCluster::new(4, NetworkConfig::chaotic());
467
468 for i in 0..4 {
470 for j in 0..5 {
471 let val = (i * 10 + j) as i32;
472 cluster.mutate(i, move |_| {
473 let mut d = GSet::new();
474 d.insert(val);
475 d
476 });
477 }
478 }
479
480 for _ in 0..20 {
482 cluster.full_sync_round();
483 cluster.retransmit_and_process();
484 }
485
486 assert!(cluster.is_converged());
488
489 for i in 0..4 {
491 for j in 0..4 {
492 for k in 0..5 {
493 let val = j * 10 + k;
494 assert!(
495 cluster.replica(i).state().contains(&val),
496 "Replica {} missing value {}",
497 i,
498 val
499 );
500 }
501 }
502 }
503 }
504
505 #[test]
506 fn test_idempotence_repeated_resends() {
507 let mut cluster: AntiEntropyCluster<GSet<i32>> =
508 AntiEntropyCluster::new(2, NetworkConfig::default());
509
510 cluster.mutate(0, |_| {
511 let mut d = GSet::new();
512 d.insert(42);
513 d
514 });
515
516 let initial_state = cluster.replica(1).state().clone();
518
519 cluster.full_sync_round();
521 let after_one = cluster.replica(1).state().clone();
522
523 for _ in 0..10 {
525 cluster.full_sync_round();
526 }
527 let after_many = cluster.replica(1).state().clone();
528
529 assert_eq!(after_one, after_many);
531
532 assert_ne!(initial_state, after_one);
534 }
535}