use std::{fmt, hash::Hash};
use bytes::Bytes;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
mod hyparview;
mod plumtree;
pub mod state;
pub mod topic;
pub mod util;
#[cfg(any(test, feature = "test-utils"))]
pub mod sim;
pub use hyparview::Config as HyparviewConfig;
pub use plumtree::{Config as PlumtreeConfig, DeliveryScope, Scope};
pub use state::{InEvent, Message, OutEvent, State, Timer, TopicId};
pub use topic::{Command, Config, Event, IO};
pub const DEFAULT_MAX_MESSAGE_SIZE: usize = 4096;
pub const MIN_MAX_MESSAGE_SIZE: usize = 512;
pub trait PeerIdentity: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned {}
impl<T> PeerIdentity for T where
T: Hash + Eq + Ord + Copy + fmt::Debug + Serialize + DeserializeOwned
{
}
#[derive(derive_more::Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[debug("PeerData({}b)", self.0.len())]
pub struct PeerData(Bytes);
impl PeerData {
pub fn new(data: impl Into<Bytes>) -> Self {
Self(data.into())
}
pub fn inner(&self) -> &bytes::Bytes {
&self.0
}
pub fn as_bytes(&self) -> &[u8] {
&self.0
}
}
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
struct PeerInfo<PI> {
pub id: PI,
pub data: Option<PeerData>,
}
impl<PI> From<(PI, Option<PeerData>)> for PeerInfo<PI> {
fn from((id, data): (PI, Option<PeerData>)) -> Self {
Self { id, data }
}
}
#[cfg(test)]
mod test {
use std::{collections::HashSet, env, fmt, str::FromStr};
use n0_tracing_test::traced_test;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use super::{Command, Config, Event};
use crate::proto::{
sim::{LatencyConfig, Network, NetworkConfig},
Scope, TopicId,
};
#[test]
#[traced_test]
fn hyparview_smoke() {
let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
let mut config = Config::default();
config.membership.active_view_capacity = 2;
let network_config = NetworkConfig {
proto: config,
latency: LatencyConfig::default_static(),
};
let mut network = Network::new(network_config, rng);
for i in 0..4 {
network.insert(i);
}
let t: TopicId = [0u8; 32].into();
network.command(0, t, Command::Join(vec![1, 2]));
network.command(1, t, Command::Join(vec![2]));
network.command(2, t, Command::Join(vec![]));
network.run_trips(3);
let actual = network.events_sorted();
let expected = sort(vec![
(0, t, Event::NeighborUp(1)),
(0, t, Event::NeighborUp(2)),
(1, t, Event::NeighborUp(2)),
(1, t, Event::NeighborUp(0)),
(2, t, Event::NeighborUp(0)),
(2, t, Event::NeighborUp(1)),
]);
assert_eq!(actual, expected);
assert_eq!(network.conns(), vec![(0, 1), (0, 2), (1, 2)]);
network.command(3, t, Command::Join(vec![0]));
network.run_trips(2);
let actual = network.events_sorted();
eprintln!("actual {actual:#?}");
let expected1 = sort(vec![
(3, t, Event::NeighborUp(0)),
(0, t, Event::NeighborUp(3)),
(0, t, Event::NeighborDown(1)),
(1, t, Event::NeighborDown(0)),
]);
let expected2 = sort(vec![
(3, t, Event::NeighborUp(0)),
(0, t, Event::NeighborUp(3)),
(0, t, Event::NeighborDown(2)),
(2, t, Event::NeighborDown(0)),
]);
assert!((actual == expected1) || (actual == expected2));
if actual == expected1 {
assert_eq!(network.conns(), vec![(0, 2), (0, 3), (1, 2)]);
} else {
assert_eq!(network.conns(), vec![(0, 1), (0, 3), (1, 2)]);
}
assert!(network.check_synchronicity());
}
#[test]
#[traced_test]
fn plumtree_smoke() {
let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
let network_config = NetworkConfig {
proto: Config::default(),
latency: LatencyConfig::default_static(),
};
let mut network = Network::new(network_config, rng);
for i in 0..6 {
network.insert(i);
}
let t = [0u8; 32].into();
network.command(0, t, Command::Join(vec![]));
(1..3).for_each(|i| network.command(i, t, Command::Join(vec![0])));
network.command(3, t, Command::Join(vec![]));
(4..6).for_each(|i| network.command(i, t, Command::Join(vec![3])));
network.run_trips(4);
let _ = network.events();
assert!(network.check_synchronicity());
network.command(
1,
t,
Command::Broadcast(b"hi1".to_vec().into(), Scope::Swarm),
);
network.run_trips(4);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
assert_eq!(received.count(), 2);
assert!(network.check_synchronicity());
network.command(2, t, Command::Join(vec![5]));
network.run_trips(3);
let _ = network.events();
println!("{}", network.report());
network.command(
1,
t,
Command::Broadcast(b"hi2".to_vec().into(), Scope::Swarm),
);
network.run_trips(5);
let events = network.events();
let received = events.filter(|x| matches!(x, (_, _, Event::Received(_))));
assert_eq!(received.count(), 5);
assert!(network.check_synchronicity());
println!("{}", network.report());
}
#[test]
#[traced_test]
fn quit() {
let rng = ChaCha12Rng::seed_from_u64(read_var("SEED", 0));
let mut config = Config::default();
config.membership.active_view_capacity = 2;
let mut network = Network::new(config.into(), rng);
let num = 4;
for i in 0..num {
network.insert(i);
}
let t: TopicId = [0u8; 32].into();
network.command(0, t, Command::Join(vec![]));
network.command(1, t, Command::Join(vec![0]));
network.command(2, t, Command::Join(vec![1]));
network.command(3, t, Command::Join(vec![2]));
network.run_trips(2);
let all_conns: HashSet<u64> = HashSet::from_iter((0u64..4).flat_map(|p| {
network
.neighbors(&p, &t)
.into_iter()
.flat_map(|x| x.into_iter())
}));
assert_eq!(all_conns, HashSet::from_iter([0, 1, 2, 3]));
assert!(network.check_synchronicity());
network.command(3, t, Command::Quit);
network.run_trips(4);
assert!(network.peer(&3).unwrap().state(&t).is_none());
let all_conns: HashSet<u64> = HashSet::from_iter((0..num).flat_map(|p| {
network
.neighbors(&p, &t)
.into_iter()
.flat_map(|x| x.into_iter())
}));
assert_eq!(all_conns, HashSet::from_iter([0, 1, 2]));
assert!(network.check_synchronicity());
}
fn read_var<T: FromStr<Err: fmt::Display + fmt::Debug>>(name: &str, default: T) -> T {
env::var(name)
.map(|x| {
x.parse()
.unwrap_or_else(|_| panic!("Failed to parse environment variable {name}"))
})
.unwrap_or(default)
}
fn sort<T: Ord + Clone>(items: Vec<T>) -> Vec<T> {
let mut sorted = items;
sorted.sort();
sorted
}
}