use std::{
collections::{btree_map::Entry, BTreeMap},
sync::{Arc, RwLock},
time::Duration,
};
use iroh::address_lookup::{self, AddressLookup, EndpointData, EndpointInfo};
use iroh_base::EndpointId;
use n0_future::{
boxed::BoxStream,
stream::{self, StreamExt},
task::AbortOnDropHandle,
time::SystemTime,
};
pub(crate) struct RetentionOpts {
retention: Duration,
evict_interval: Duration,
}
impl Default for RetentionOpts {
fn default() -> Self {
Self {
retention: Duration::from_secs(60 * 5),
evict_interval: Duration::from_secs(30),
}
}
}
#[derive(Debug, Clone)]
pub(crate) struct GossipAddressLookup {
endpoints: NodeMap,
_task_handle: Arc<AbortOnDropHandle<()>>,
}
type NodeMap = Arc<RwLock<BTreeMap<EndpointId, StoredEndpointInfo>>>;
#[derive(Debug)]
struct StoredEndpointInfo {
data: EndpointData,
last_updated: SystemTime,
}
impl Default for GossipAddressLookup {
fn default() -> Self {
Self::new()
}
}
impl GossipAddressLookup {
const PROVENANCE: &'static str = "gossip";
pub(crate) fn new() -> Self {
Self::with_opts(Default::default())
}
pub(crate) fn with_opts(opts: RetentionOpts) -> Self {
let endpoints: NodeMap = Default::default();
let task = {
let endpoints = Arc::downgrade(&endpoints);
n0_future::task::spawn(async move {
let mut interval = n0_future::time::interval(opts.evict_interval);
loop {
interval.tick().await;
let Some(endpoints) = endpoints.upgrade() else {
break;
};
let now = SystemTime::now();
endpoints.write().expect("poisoned").retain(|_k, v| {
let age = now.duration_since(v.last_updated).unwrap_or(Duration::MAX);
age <= opts.retention
});
}
})
};
Self {
endpoints,
_task_handle: Arc::new(AbortOnDropHandle::new(task)),
}
}
pub(crate) fn add(&self, endpoint_info: impl Into<EndpointInfo>) {
let last_updated = SystemTime::now();
let EndpointInfo { endpoint_id, data } = endpoint_info.into();
let mut guard = self.endpoints.write().expect("poisoned");
match guard.entry(endpoint_id) {
Entry::Occupied(mut entry) => {
let existing = entry.get_mut();
existing.data.add_addrs(data.addrs().cloned());
existing.data.set_user_data(data.user_data().cloned());
existing.last_updated = last_updated;
}
Entry::Vacant(entry) => {
entry.insert(StoredEndpointInfo { data, last_updated });
}
}
}
}
impl AddressLookup for GossipAddressLookup {
fn resolve(
&self,
endpoint_id: EndpointId,
) -> Option<BoxStream<Result<address_lookup::Item, address_lookup::Error>>> {
let guard = self.endpoints.read().expect("poisoned");
let info = guard.get(&endpoint_id)?;
let last_updated = info
.last_updated
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time drift")
.as_micros() as u64;
let item = address_lookup::Item::new(
EndpointInfo::from_parts(endpoint_id, info.data.clone()),
Self::PROVENANCE,
Some(last_updated),
);
Some(stream::iter(Some(Ok(item))).boxed())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use iroh::{address_lookup::AddressLookup, EndpointAddr, SecretKey};
use n0_future::StreamExt;
use rand::SeedableRng;
use super::{GossipAddressLookup, RetentionOpts};
#[tokio::test]
async fn test_retention() {
let opts = RetentionOpts {
evict_interval: Duration::from_millis(100),
retention: Duration::from_millis(500),
};
let disco = GossipAddressLookup::with_opts(opts);
let rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1);
let k1 = SecretKey::generate(rng);
let a1 = EndpointAddr::new(k1.public());
disco.add(a1);
assert!(matches!(
disco.resolve(k1.public()).unwrap().next().await,
Some(Ok(_))
));
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(matches!(
disco.resolve(k1.public()).unwrap().next().await,
Some(Ok(_))
));
tokio::time::sleep(Duration::from_millis(700)).await;
assert!(disco.resolve(k1.public()).is_none());
}
}