//! This module contains the implementation of the gossiping protocol for an individual topic
use std::collections::VecDeque;
use bytes::Bytes;
use derive_more::From;
use n0_future::time::{Duration, Instant};
use rand::Rng;
use serde::{Deserialize, Serialize};
use super::{
hyparview::{self, InEvent as SwarmIn},
plumtree::{self, GossipEvent, InEvent as GossipIn, Scope},
state::MessageKind,
PeerData, PeerIdentity, DEFAULT_MAX_MESSAGE_SIZE,
};
use crate::proto::MIN_MAX_MESSAGE_SIZE;
/// Input event to the topic state handler.
#[derive(Clone, Debug)]
pub enum InEvent<PI> {
/// Message received from the network.
RecvMessage(PI, Message<PI>),
/// Execute a command from the application.
Command(Command<PI>),
/// Trigger a previously scheduled timer.
TimerExpired(Timer<PI>),
/// Peer disconnected on the network level.
PeerDisconnected(PI),
/// Update the opaque peer data about yourself.
UpdatePeerData(PeerData),
}
/// An output event from the state handler.
#[derive(Debug, PartialEq, Eq)]
pub enum OutEvent<PI> {
/// Send a message on the network
SendMessage(PI, Message<PI>),
/// Emit an event to the application.
EmitEvent(Event<PI>),
/// Schedule a timer. The runtime is responsible for sending an [InEvent::TimerExpired]
/// after the duration.
ScheduleTimer(Duration, Timer<PI>),
/// Close the connection to a peer on the network level.
DisconnectPeer(PI),
/// Emitted when new [`PeerData`] was received for a peer.
PeerData(PI, PeerData),
}
impl<PI> From<hyparview::OutEvent<PI>> for OutEvent<PI> {
fn from(event: hyparview::OutEvent<PI>) -> Self {
use hyparview::OutEvent::*;
match event {
SendMessage(to, message) => Self::SendMessage(to, message.into()),
ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
DisconnectPeer(peer) => Self::DisconnectPeer(peer),
EmitEvent(event) => Self::EmitEvent(event.into()),
PeerData(peer, data) => Self::PeerData(peer, data),
}
}
}
impl<PI> From<plumtree::OutEvent<PI>> for OutEvent<PI> {
fn from(event: plumtree::OutEvent<PI>) -> Self {
use plumtree::OutEvent::*;
match event {
SendMessage(to, message) => Self::SendMessage(to, message.into()),
ScheduleTimer(delay, timer) => Self::ScheduleTimer(delay, timer.into()),
EmitEvent(event) => Self::EmitEvent(event.into()),
}
}
}
/// A trait for a concrete type to push `OutEvent`s to.
///
/// The implementation is generic over this trait, which allows the upper layer to supply a
/// container of their choice for `OutEvent`s emitted from the protocol state.
pub trait IO<PI: Clone> {
/// Push an event in the IO container
fn push(&mut self, event: impl Into<OutEvent<PI>>);
/// Push all events from an iterator into the IO container
fn push_from_iter(&mut self, iter: impl IntoIterator<Item = impl Into<OutEvent<PI>>>) {
for event in iter.into_iter() {
self.push(event);
}
}
}
/// A protocol message for a particular topic
#[derive(From, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum Message<PI> {
/// A message of the swarm membership layer
Swarm(hyparview::Message<PI>),
/// A message of the gossip broadcast layer
Gossip(plumtree::Message),
}
impl<PI> Message<PI> {
/// Get the kind of this message
pub fn kind(&self) -> MessageKind {
match self {
Message::Swarm(_) => MessageKind::Control,
Message::Gossip(message) => match message {
plumtree::Message::Gossip(_) => MessageKind::Data,
_ => MessageKind::Control,
},
}
}
/// Returns `true` if this is a disconnect message (which is the last message sent to a peer per topic).
pub fn is_disconnect(&self) -> bool {
matches!(self, Message::Swarm(hyparview::Message::Disconnect(_)))
}
}
/// An event to be emitted to the application for a particular topic.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
pub enum Event<PI> {
/// We have a new, direct neighbor in the swarm membership layer for this topic
NeighborUp(PI),
/// We dropped direct neighbor in the swarm membership layer for this topic
NeighborDown(PI),
/// A gossip message was received for this topic
Received(GossipEvent<PI>),
}
impl<PI> From<hyparview::Event<PI>> for Event<PI> {
fn from(value: hyparview::Event<PI>) -> Self {
match value {
hyparview::Event::NeighborUp(peer) => Self::NeighborUp(peer),
hyparview::Event::NeighborDown(peer) => Self::NeighborDown(peer),
}
}
}
impl<PI> From<plumtree::Event<PI>> for Event<PI> {
fn from(value: plumtree::Event<PI>) -> Self {
match value {
plumtree::Event::Received(event) => Self::Received(event),
}
}
}
/// A timer to be registered for a particular topic.
///
/// This should be treated as an opaque value by the implementer and, once emitted, simply returned
/// to the protocol through [`InEvent::TimerExpired`].
#[derive(Clone, From, Debug, PartialEq, Eq)]
pub enum Timer<PI> {
/// A timer for the swarm layer
Swarm(hyparview::Timer<PI>),
/// A timer for the gossip layer
Gossip(plumtree::Timer),
}
/// A command to the protocol state for a particular topic.
#[derive(Clone, derive_more::Debug)]
pub enum Command<PI> {
/// Join this topic and connect to peers.
///
/// If the list of peers is empty, will prepare the state and accept incoming join requests,
/// but only become operational after the first join request by another peer.
Join(Vec<PI>),
/// Broadcast a message for this topic.
Broadcast(#[debug("<{}b>", _0.len())] Bytes, Scope),
/// Leave this topic and drop all state.
Quit,
}
impl<PI: Clone> IO<PI> for VecDeque<OutEvent<PI>> {
fn push(&mut self, event: impl Into<OutEvent<PI>>) {
self.push_back(event.into())
}
}
/// Protocol configuration
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
/// Configuration for the swarm membership layer
pub membership: hyparview::Config,
/// Configuration for the gossip broadcast layer
pub broadcast: plumtree::Config,
/// Max message size in bytes.
///
/// This size should be the same across a network to ensure all nodes can transmit and read large messages.
///
/// At minimum, this size should be large enough to send gossip control messages. This can vary, depending on the size of the [`PeerIdentity`] you use and the size of the [`PeerData`] you transmit in your messages.
///
/// The default is [`DEFAULT_MAX_MESSAGE_SIZE`].
pub max_message_size: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
membership: Default::default(),
broadcast: Default::default(),
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
}
}
}
/// The topic state maintains the swarm membership and broadcast tree for a particular topic.
#[derive(Debug)]
pub struct State<PI, R> {
me: PI,
pub(crate) swarm: hyparview::State<PI, R>,
pub(crate) gossip: plumtree::State<PI>,
outbox: VecDeque<OutEvent<PI>>,
stats: Stats,
}
impl<PI: PeerIdentity> State<PI, rand::rngs::ThreadRng> {
/// Initialize the local state with the default random number generator.
///
/// ## Panics
///
/// Panics if [`Config::max_message_size`] is below [`MIN_MAX_MESSAGE_SIZE`].
pub fn new(me: PI, me_data: Option<PeerData>, config: Config) -> Self {
Self::with_rng(me, me_data, config, rand::rng())
}
}
impl<PI, R> State<PI, R> {
/// The address of your local endpoint.
pub fn endpoint(&self) -> &PI {
&self.me
}
}
impl<PI: PeerIdentity, R: Rng> State<PI, R> {
/// Initialize the local state with a custom random number generator.
///
/// ## Panics
///
/// Panics if [`Config::max_message_size`] is below [`MIN_MAX_MESSAGE_SIZE`].
pub fn with_rng(me: PI, me_data: Option<PeerData>, config: Config, rng: R) -> Self {
assert!(
config.max_message_size >= MIN_MAX_MESSAGE_SIZE,
"max_message_size must be at least {MIN_MAX_MESSAGE_SIZE}"
);
let max_payload_size =
config.max_message_size - super::Message::<PI>::postcard_header_size();
Self {
swarm: hyparview::State::new(me, me_data, config.membership, rng),
gossip: plumtree::State::new(me, config.broadcast, max_payload_size),
me,
outbox: VecDeque::new(),
stats: Stats::default(),
}
}
/// Handle an incoming event.
///
/// Returns an iterator of outgoing events that must be processed by the application.
pub fn handle(
&mut self,
event: InEvent<PI>,
now: Instant,
) -> impl Iterator<Item = OutEvent<PI>> + '_ {
let io = &mut self.outbox;
// Process the event, store out events in outbox.
match event {
InEvent::Command(command) => match command {
Command::Join(peers) => {
for peer in peers {
self.swarm.handle(SwarmIn::RequestJoin(peer), io);
}
}
Command::Broadcast(data, scope) => {
self.gossip
.handle(GossipIn::Broadcast(data, scope), now, io)
}
Command::Quit => self.swarm.handle(SwarmIn::Quit, io),
},
InEvent::RecvMessage(from, message) => {
self.stats.messages_received += 1;
match message {
Message::Swarm(message) => {
self.swarm.handle(SwarmIn::RecvMessage(from, message), io)
}
Message::Gossip(message) => {
self.gossip
.handle(GossipIn::RecvMessage(from, message), now, io)
}
}
}
InEvent::TimerExpired(timer) => match timer {
Timer::Swarm(timer) => self.swarm.handle(SwarmIn::TimerExpired(timer), io),
Timer::Gossip(timer) => self.gossip.handle(GossipIn::TimerExpired(timer), now, io),
},
InEvent::PeerDisconnected(peer) => {
self.swarm.handle(SwarmIn::PeerDisconnected(peer), io);
self.gossip.handle(GossipIn::NeighborDown(peer), now, io);
}
InEvent::UpdatePeerData(data) => self.swarm.handle(SwarmIn::UpdatePeerData(data), io),
}
// Forward NeighborUp and NeighborDown events from hyparview to plumtree
let mut io = VecDeque::new();
for event in self.outbox.iter() {
match event {
OutEvent::EmitEvent(Event::NeighborUp(peer)) => {
self.gossip
.handle(GossipIn::NeighborUp(*peer), now, &mut io)
}
OutEvent::EmitEvent(Event::NeighborDown(peer)) => {
self.gossip
.handle(GossipIn::NeighborDown(*peer), now, &mut io)
}
_ => {}
}
}
// Note that this is a no-op because plumtree::handle(NeighborUp | NeighborDown)
// above does not emit any OutEvents.
self.outbox.extend(io.drain(..));
// Update sent message counter
self.stats.messages_sent += self
.outbox
.iter()
.filter(|event| matches!(event, OutEvent::SendMessage(_, _)))
.count();
self.outbox.drain(..)
}
/// Get stats on how many messages were sent and received.
// TODO: Remove/replace with metrics?
pub fn stats(&self) -> &Stats {
&self.stats
}
/// Reset all statistics.
pub fn reset_stats(&mut self) {
self.gossip.stats = Default::default();
self.swarm.stats = Default::default();
self.stats = Default::default();
}
/// Get statistics for the gossip broadcast state
///
/// TODO: Remove/replace with metrics?
pub fn gossip_stats(&self) -> &plumtree::Stats {
self.gossip.stats()
}
/// Check if this topic has any active (connected) peers.
pub fn has_active_peers(&self) -> bool {
!self.swarm.active_view.is_empty()
}
}
/// Statistics for the protocol state of a topic
#[derive(Clone, Debug, Default)]
pub struct Stats {
/// Number of messages sent
pub messages_sent: usize,
/// Number of messages received
pub messages_received: usize,
}
//! This module contains the implementation of the gossiping protocol for an individual topic
use VecDeque;
use Bytes;
use From;
use ;
use Rng;
use ;
use ;
use crateMIN_MAX_MESSAGE_SIZE;
/// Input event to the topic state handler.
/// An output event from the state handler.
/// A trait for a concrete type to push `OutEvent`s to.
///
/// The implementation is generic over this trait, which allows the upper layer to supply a
/// container of their choice for `OutEvent`s emitted from the protocol state.
/// A protocol message for a particular topic
/// An event to be emitted to the application for a particular topic.
/// A timer to be registered for a particular topic.
///
/// This should be treated as an opaque value by the implementer and, once emitted, simply returned
/// to the protocol through [`InEvent::TimerExpired`].
/// A command to the protocol state for a particular topic.
/// Protocol configuration
/// The topic state maintains the swarm membership and broadcast tree for a particular topic.
/// Statistics for the protocol state of a topic