use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
hash::Hash,
};
use bytes::Bytes;
use derive_more::{Add, From, Sub};
use n0_future::time::{Duration, Instant};
use postcard::experimental::max_size::MaxSize;
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use super::{
util::{idbytes_impls, TimeBoundCache},
PeerIdentity, IO,
};
#[derive(Clone, Hash, Copy, PartialEq, Eq)]
pub struct MessageId([u8; 32]);
impl MaxSize for MessageId {
const POSTCARD_MAX_SIZE: usize = 64;
}
impl Serialize for MessageId {
fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
use serde::ser::SerializeTuple;
let mut seq = serializer.serialize_tuple(64)?;
for byte in &self.0 {
seq.serialize_element(byte)?;
}
seq.end()
}
}
impl<'de> Deserialize<'de> for MessageId {
fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
struct Visitor;
impl<'de> serde::de::Visitor<'de> for Visitor {
type Value = MessageId;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "32 bytes")
}
fn visit_seq<A: serde::de::SeqAccess<'de>>(self, mut seq: A) -> Result<MessageId, A::Error> {
let mut bytes = [0u8; 32];
for (i, byte) in bytes.iter_mut().enumerate() {
*byte = seq.next_element()?.ok_or_else(|| serde::de::Error::invalid_length(i, &self))?;
}
Ok(MessageId(bytes))
}
}
deserializer.deserialize_tuple(32, Visitor)
}
}
idbytes_impls!(MessageId, "MessageId", 32);
impl MessageId {
pub fn from_content(message: &[u8]) -> Self {
Self::from(*hemera::hash(message).as_bytes())
}
}
#[derive(Debug)]
pub enum InEvent<PI> {
RecvMessage(PI, Message),
Broadcast(Bytes, Scope),
TimerExpired(Timer),
NeighborUp(PI),
NeighborDown(PI),
}
#[derive(Debug, PartialEq, Eq)]
pub enum OutEvent<PI> {
SendMessage(PI, Message),
ScheduleTimer(Duration, Timer),
EmitEvent(Event<PI>),
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Timer {
SendGraft(MessageId),
DispatchLazyPush,
EvictCache,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Event<PI> {
Received(GossipEvent<PI>),
}
#[derive(Clone, derive_more::Debug, PartialEq, Eq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct GossipEvent<PI> {
#[debug("<{}b>", content.len())]
pub content: Bytes,
/// The peer that we received the gossip message from. Note that this is not the peer that
/// originally broadcasted the message, but the peer before us in the gossiping path.
pub delivered_from: PI,
/// The broadcast scope of the message.
pub scope: DeliveryScope,
}
impl<PI> GossipEvent<PI> {
fn from_message(message: &Gossip, from: PI) -> Self {
Self {
content: message.content.clone(),
scope: message.scope,
delivered_from: from,
}
}
}
/// Number of delivery hops a message has taken.
#[derive(
From,
Add,
Sub,
Serialize,
Deserialize,
Eq,
PartialEq,
PartialOrd,
Ord,
Clone,
Copy,
Debug,
Hash,
MaxSize,
)]
pub struct Round(u16);
impl Round {
pub fn next(&self) -> Round {
Round(self.0 + 1)
}
}
/// Messages that we can send and receive from peers within the topic.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub enum Message {
/// When receiving Gossip, emit as event and forward full message to eager peer and (after a
/// delay) message IDs to lazy peers.
Gossip(Gossip),
/// When receiving Prune, move the peer from the eager to the lazy set.
Prune,
/// When receiving Graft, move the peer to the eager set and send the full content for the
/// included message ID.
Graft(Graft),
/// When receiving IHave, do nothing initially, and request the messages for the included
/// message IDs after some time if they aren't pushed eagerly to us.
IHave(Vec<IHave>),
}
/// Payload messages transmitted by the protocol.
#[derive(Serialize, Deserialize, Clone, derive_more::Debug, PartialEq, Eq)]
pub struct Gossip {
/// Id of the message.
id: MessageId,
/// Message contents.
#[debug("<{}b>", content.len())]
content: Bytes,
/// Scope to broadcast to.
scope: DeliveryScope,
}
impl Gossip {
fn round(&self) -> Option<Round> {
match self.scope {
DeliveryScope::Swarm(round) => Some(round),
DeliveryScope::Neighbors => None,
}
}
}
/// The scope to deliver the message to.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)]
pub enum DeliveryScope {
/// This message was received from the swarm, with a distance (in hops) travelled from the
/// original broadcaster.
Swarm(Round),
/// This message was received from a direct neighbor that broadcasted the message to neighbors
/// only.
Neighbors,
}
impl DeliveryScope {
/// Whether this message was directly received from its publisher.
pub fn is_direct(&self) -> bool {
matches!(self, Self::Neighbors | Self::Swarm(Round(0)))
}
}
/// The broadcast scope of a gossip message.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Ord, PartialOrd, Copy)]
pub enum Scope {
/// The message is broadcast to all peers in the swarm.
Swarm,
/// The message is broadcast only to the immediate neighbors of a peer.
Neighbors,
}
impl Gossip {
/// Get a clone of this `Gossip` message and increase the delivery round by 1.
pub fn next_round(&self) -> Option<Gossip> {
match self.scope {
DeliveryScope::Neighbors => None,
DeliveryScope::Swarm(round) => Some(Gossip {
id: self.id,
content: self.content.clone(),
scope: DeliveryScope::Swarm(round.next()),
}),
}
}
/// Validate that the message id is the Poseidon2 hash of the message content.
pub fn validate(&self) -> bool {
let expected = MessageId::from_content(&self.content);
expected == self.id
}
}
/// Control message to inform peers we have a message without transmitting the whole payload.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, MaxSize)]
pub struct IHave {
/// Id of the message.
pub(crate) id: MessageId,
/// Delivery round of the message.
pub(crate) round: Round,
}
/// Control message to signal a peer that they have been moved to the eager set, and to ask the
/// peer to do the same with this node.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct Graft {
/// Message id that triggers the graft, if any.
/// On receiving a graft, the payload message must be sent in reply if a message id is set.
id: Option<MessageId>,
/// Delivery round of the [`Message::IHave`] that triggered this Graft message.
round: Round,
}
/// Configuration for the gossip broadcast layer.
///
/// Currently, the expectation is that the configuration is the same for all peers in the
/// network (as recommended in the paper).
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct Config {
/// When receiving an `IHave` message, this timeout is registered. If the message for the
/// `IHave` was not received once the timeout is expired, a `Graft` message is sent to the
/// peer that sent us the `IHave` to request the message payload.
///
/// The plumtree paper notes:
/// > The timeout value is a protocol parameter that should be configured considering the
/// > diameter of the overlay and a target maximum recovery latency, defined by the application
/// > requirements. (p.8)
pub graft_timeout_1: Duration,
/// This timeout is registered when sending a `Graft` message. If a reply has not been
/// received once the timeout expires, we send another `Graft` message to the next peer that
/// sent us an `IHave` for this message.
///
/// The plumtree paper notes:
/// > This second timeout value should be smaller that the first, in the order of an average
/// > round trip time to a neighbor.
pub graft_timeout_2: Duration,
/// Timeout after which `IHave` messages are pushed to peers.
pub dispatch_timeout: Duration,
/// The protocol performs a tree optimization, which promotes lazy peers to eager peers if the
/// `Message::IHave` messages received from them have a lower number of hops from the
/// message's origin as the `InEvent::Broadcast` messages received from our eager peers. This
/// parameter is the number of hops that the lazy peers must be closer to the origin than our
/// eager peers to be promoted to become an eager peer.
pub optimization_threshold: Round,
/// Duration for which to keep gossip messages in the internal message cache.
///
/// Messages broadcast from this node or received from other nodes are kept in an internal
/// cache for this duration before being evicted. If this is too low, other nodes will not be
/// able to retrieve messages once they need them. If this is high, the cache will grow.
///
/// Should be at least around several round trip times to peers.
pub message_cache_retention: Duration,
/// Duration for which to keep the `MessageId`s for received messages.
///
/// Should be at least as long as [`Self::message_cache_retention`], usually will be longer to
/// not accidentally receive messages multiple times.
pub message_id_retention: Duration,
/// How often the internal caches will be checked for expired items.
pub cache_evict_interval: Duration,
}
impl Default for Config {
/// Sensible defaults for the plumtree configuration
//
// TODO: Find out what good defaults are for the three timeouts here. Current numbers are
// guesses that need validation. The paper does not have concrete recommendations for these
// numbers.
fn default() -> Self {
Self {
// Paper: "The timeout value is a protocol parameter that should be configured considering
graft_timeout_1: Duration::from_millis(80),
graft_timeout_2: Duration::from_millis(40),
dispatch_timeout: Duration::from_millis(5),
optimization_threshold: Round(7),
message_cache_retention: Duration::from_secs(30),
message_id_retention: Duration::from_secs(90),
cache_evict_interval: Duration::from_secs(1),
}
}
}
#[derive(Debug, Default, Clone)]
pub struct Stats {
pub payload_messages_received: u64,
pub control_messages_received: u64,
pub max_last_delivery_hop: u16,
}
#[derive(Debug)]
pub struct State<PI> {
me: PI,
config: Config,
pub(crate) eager_push_peers: BTreeSet<PI>,
pub(crate) lazy_push_peers: BTreeSet<PI>,
lazy_push_queue: BTreeMap<PI, Vec<IHave>>,
missing_messages: HashMap<MessageId, VecDeque<(PI, Round)>>,
received_messages: TimeBoundCache<MessageId, ()>,
cache: TimeBoundCache<MessageId, Gossip>,
graft_timer_scheduled: HashSet<MessageId>,
dispatch_timer_scheduled: bool,
init: bool,
pub(crate) stats: Stats,
max_message_size: usize,
}
impl<PI: PeerIdentity> State<PI> {
pub fn new(me: PI, config: Config, max_message_size: usize) -> Self {
Self {
me,
eager_push_peers: Default::default(),
lazy_push_peers: Default::default(),
lazy_push_queue: Default::default(),
config,
missing_messages: Default::default(),
received_messages: Default::default(),
graft_timer_scheduled: Default::default(),
dispatch_timer_scheduled: false,
cache: Default::default(),
init: false,
stats: Default::default(),
max_message_size,
}
}
pub fn handle(&mut self, event: InEvent<PI>, now: Instant, io: &mut impl IO<PI>) {
if !self.init {
self.init = true;
self.on_evict_cache_timer(now, io)
}
match event {
InEvent::RecvMessage(from, message) => self.handle_message(from, message, now, io),
InEvent::Broadcast(data, scope) => self.broadcast(data, scope, now, io),
InEvent::NeighborUp(peer) => self.on_neighbor_up(peer),
InEvent::NeighborDown(peer) => self.on_neighbor_down(peer),
InEvent::TimerExpired(timer) => match timer {
Timer::DispatchLazyPush => self.on_dispatch_timer(io),
Timer::SendGraft(id) => {
self.on_send_graft_timer(id, io);
}
Timer::EvictCache => self.on_evict_cache_timer(now, io),
},
}
}
pub fn stats(&self) -> &Stats {
&self.stats
}
fn handle_message(&mut self, sender: PI, message: Message, now: Instant, io: &mut impl IO<PI>) {
if matches!(message, Message::Gossip(_)) {
self.stats.payload_messages_received += 1;
} else {
self.stats.control_messages_received += 1;
}
match message {
Message::Gossip(details) => self.on_gossip(sender, details, now, io),
Message::Prune => self.on_prune(sender),
Message::IHave(details) => self.on_ihave(sender, details, io),
Message::Graft(details) => self.on_graft(sender, details, io),
}
}
fn on_dispatch_timer(&mut self, io: &mut impl IO<PI>) {
let chunk_size = self.max_message_size
- 1
- 2;
let chunk_len = chunk_size / IHave::POSTCARD_MAX_SIZE;
while let Some((peer, list)) = self.lazy_push_queue.pop_first() {
for chunk in list.chunks(chunk_len) {
io.push(OutEvent::SendMessage(peer, Message::IHave(chunk.to_vec())));
}
}
self.dispatch_timer_scheduled = false;
}
fn broadcast(&mut self, content: Bytes, scope: Scope, now: Instant, io: &mut impl IO<PI>) {
let id = MessageId::from_content(&content);
let scope = match scope {
Scope::Neighbors => DeliveryScope::Neighbors,
Scope::Swarm => DeliveryScope::Swarm(Round(0)),
};
let message = Gossip { id, content, scope };
let me = self.me;
if let DeliveryScope::Swarm(_) = scope {
self.received_messages
.insert(id, (), now + self.config.message_id_retention);
self.cache.insert(
id,
message.clone(),
now + self.config.message_cache_retention,
);
self.lazy_push(message.clone(), &me, io);
}
self.eager_push(message.clone(), &me, io);
}
fn on_gossip(&mut self, sender: PI, message: Gossip, now: Instant, io: &mut impl IO<PI>) {
if !message.validate() {
warn!(
peer = ?sender,
"Received a message with spoofed message id ({})", message.id
);
return;
}
if self.received_messages.contains_key(&message.id) {
self.add_lazy(sender);
io.push(OutEvent::SendMessage(sender, Message::Prune));
} else {
if let DeliveryScope::Swarm(prev_round) = message.scope {
self.received_messages.insert(
message.id,
(),
now + self.config.message_id_retention,
);
let message = message.next_round().expect("just checked");
self.cache.insert(
message.id,
message.clone(),
now + self.config.message_cache_retention,
);
self.eager_push(message.clone(), &sender, io);
self.lazy_push(message.clone(), &sender, io);
self.graft_timer_scheduled.remove(&message.id);
let previous_ihaves = self.missing_messages.remove(&message.id);
if let Some(previous_ihaves) = previous_ihaves {
self.optimize_tree(&sender, &message, previous_ihaves, io);
}
self.stats.max_last_delivery_hop =
self.stats.max_last_delivery_hop.max(prev_round.0);
}
io.push(OutEvent::EmitEvent(Event::Received(
GossipEvent::from_message(&message, sender),
)));
}
}
fn optimize_tree(
&mut self,
gossip_sender: &PI,
message: &Gossip,
previous_ihaves: VecDeque<(PI, Round)>,
io: &mut impl IO<PI>,
) {
let round = message.round().expect("only called for swarm messages");
let best_ihave = previous_ihaves
.iter()
.min_by(|(_a_peer, a_round), (_b_peer, b_round)| a_round.cmp(b_round))
.copied();
if let Some((ihave_peer, ihave_round)) = best_ihave {
if (ihave_round < round) && (round - ihave_round) >= self.config.optimization_threshold
{
if !self.eager_push_peers.contains(&ihave_peer) {
let message = Message::Graft(Graft {
id: None,
round: ihave_round,
});
self.add_eager(ihave_peer);
io.push(OutEvent::SendMessage(ihave_peer, message));
}
self.add_lazy(*gossip_sender);
io.push(OutEvent::SendMessage(*gossip_sender, Message::Prune));
}
}
}
fn on_prune(&mut self, sender: PI) {
self.add_lazy(sender);
}
fn on_ihave(&mut self, sender: PI, ihaves: Vec<IHave>, io: &mut impl IO<PI>) {
for ihave in ihaves {
if !self.received_messages.contains_key(&ihave.id) {
self.missing_messages
.entry(ihave.id)
.or_default()
.push_back((sender, ihave.round));
if !self.graft_timer_scheduled.contains(&ihave.id) {
self.graft_timer_scheduled.insert(ihave.id);
io.push(OutEvent::ScheduleTimer(
self.config.graft_timeout_1,
Timer::SendGraft(ihave.id),
));
}
}
}
}
fn on_send_graft_timer(&mut self, id: MessageId, io: &mut impl IO<PI>) {
self.graft_timer_scheduled.remove(&id);
if self.received_messages.contains_key(&id) {
return;
}
let entry = self
.missing_messages
.get_mut(&id)
.and_then(|entries| entries.pop_front());
if let Some((peer, round)) = entry {
self.add_eager(peer);
let message = Message::Graft(Graft {
id: Some(id),
round,
});
io.push(OutEvent::SendMessage(peer, message));
io.push(OutEvent::ScheduleTimer(
self.config.graft_timeout_2,
Timer::SendGraft(id),
));
}
}
fn on_graft(&mut self, sender: PI, details: Graft, io: &mut impl IO<PI>) {
self.add_eager(sender);
if let Some(id) = details.id {
if let Some(message) = self.cache.get(&id) {
io.push(OutEvent::SendMessage(
sender,
Message::Gossip(message.clone()),
));
} else {
debug!(?id, peer=?sender, "on_graft failed to graft: message not in cache");
}
}
}
fn on_neighbor_up(&mut self, peer: PI) {
self.add_eager(peer);
}
fn on_neighbor_down(&mut self, peer: PI) {
self.missing_messages.retain(|_message_id, ihaves| {
ihaves.retain(|(ihave_peer, _round)| *ihave_peer != peer);
!ihaves.is_empty()
});
self.eager_push_peers.remove(&peer);
self.lazy_push_peers.remove(&peer);
}
fn on_evict_cache_timer(&mut self, now: Instant, io: &mut impl IO<PI>) {
self.cache.expire_until(now);
self.received_messages.expire_until(now);
io.push(OutEvent::ScheduleTimer(
self.config.cache_evict_interval,
Timer::EvictCache,
));
}
fn add_eager(&mut self, peer: PI) {
self.lazy_push_peers.remove(&peer);
self.eager_push_peers.insert(peer);
}
fn add_lazy(&mut self, peer: PI) {
self.eager_push_peers.remove(&peer);
self.lazy_push_peers.insert(peer);
}
fn eager_push(&mut self, gossip: Gossip, sender: &PI, io: &mut impl IO<PI>) {
for peer in self
.eager_push_peers
.iter()
.filter(|peer| **peer != self.me && *peer != sender)
{
io.push(OutEvent::SendMessage(
*peer,
Message::Gossip(gossip.clone()),
));
}
}
fn lazy_push(&mut self, gossip: Gossip, sender: &PI, io: &mut impl IO<PI>) {
let Some(round) = gossip.round() else {
return;
};
for peer in self.lazy_push_peers.iter().filter(|x| *x != sender) {
self.lazy_push_queue.entry(*peer).or_default().push(IHave {
id: gossip.id,
round,
});
}
if !self.dispatch_timer_scheduled {
io.push(OutEvent::ScheduleTimer(
self.config.dispatch_timeout,
Timer::DispatchLazyPush,
));
self.dispatch_timer_scheduled = true;
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn optimize_tree() {
let mut io = VecDeque::new();
let config: Config = Default::default();
let mut state = State::new(1, config.clone(), 1024);
let now = Instant::now();
let content: Bytes = b"hi".to_vec().into();
let id = MessageId::from_content(&content);
let event = InEvent::RecvMessage(
2u32,
Message::IHave(vec![IHave {
id,
round: Round(2),
}]),
);
state.handle(event, now, &mut io);
io.clear();
let event = InEvent::RecvMessage(
3,
Message::Gossip(Gossip {
id,
content: content.clone(),
scope: DeliveryScope::Swarm(Round(6)),
}),
);
state.handle(event, now, &mut io);
let expected = {
let mut io = VecDeque::new();
io.push(OutEvent::ScheduleTimer(
config.dispatch_timeout,
Timer::DispatchLazyPush,
));
io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
content,
delivered_from: 3,
scope: DeliveryScope::Swarm(Round(6)),
})));
io
};
assert_eq!(io, expected);
io.clear();
let content: Bytes = b"hi2".to_vec().into();
let id = MessageId::from_content(&content);
let event = InEvent::RecvMessage(
2u32,
Message::IHave(vec![IHave {
id,
round: Round(2),
}]),
);
state.handle(event, now, &mut io);
io.clear();
let event = InEvent::RecvMessage(
3,
Message::Gossip(Gossip {
id,
content: content.clone(),
scope: DeliveryScope::Swarm(Round(9)),
}),
);
state.handle(event, now, &mut io);
let expected = {
let mut io = VecDeque::new();
io.push(OutEvent::SendMessage(
2,
Message::Graft(Graft {
id: None,
round: Round(2),
}),
));
io.push(OutEvent::SendMessage(3, Message::Prune));
io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
content,
delivered_from: 3,
scope: DeliveryScope::Swarm(Round(9)),
})));
io
};
assert_eq!(io, expected);
}
#[test]
fn spoofed_messages_are_ignored() {
let config: Config = Default::default();
let mut state = State::new(1, config.clone(), 1024);
let now = Instant::now();
let content: Bytes = b"hello1".to_vec().into();
let message = Message::Gossip(Gossip {
content: content.clone(),
id: MessageId::from_content(&content),
scope: DeliveryScope::Swarm(Round(1)),
});
let mut io = VecDeque::new();
state.handle(InEvent::RecvMessage(2, message), now, &mut io);
let expected = {
let mut io = VecDeque::new();
io.push(OutEvent::ScheduleTimer(
config.cache_evict_interval,
Timer::EvictCache,
));
io.push(OutEvent::ScheduleTimer(
config.dispatch_timeout,
Timer::DispatchLazyPush,
));
io.push(OutEvent::EmitEvent(Event::Received(GossipEvent {
content,
delivered_from: 2,
scope: DeliveryScope::Swarm(Round(1)),
})));
io
};
assert_eq!(io, expected);
let content: Bytes = b"hello2".to_vec().into();
let message = Message::Gossip(Gossip {
content,
id: MessageId::from_content(b"foo"),
scope: DeliveryScope::Swarm(Round(1)),
});
let mut io = VecDeque::new();
state.handle(InEvent::RecvMessage(2, message), now, &mut io);
let expected = VecDeque::new();
assert_eq!(io, expected);
}
#[test]
fn cache_is_evicted() {
let config: Config = Default::default();
let mut state = State::new(1, config.clone(), 1024);
let now = Instant::now();
let content: Bytes = b"hello1".to_vec().into();
let message = Message::Gossip(Gossip {
content: content.clone(),
id: MessageId::from_content(&content),
scope: DeliveryScope::Swarm(Round(1)),
});
let mut io = VecDeque::new();
state.handle(InEvent::RecvMessage(2, message), now, &mut io);
assert_eq!(state.cache.len(), 1);
let now = now + Duration::from_secs(1);
state.handle(InEvent::TimerExpired(Timer::EvictCache), now, &mut io);
assert_eq!(state.cache.len(), 1);
let now = now + config.message_cache_retention;
state.handle(InEvent::TimerExpired(Timer::EvictCache), now, &mut io);
assert_eq!(state.cache.len(), 0);
}
}