//! The state kept for each network path to a remote endpoint.
use std::{
collections::{HashMap, HashSet, VecDeque},
sync::Arc,
};
use n0_error::e;
use n0_future::time::Instant;
use rustc_hash::FxHashMap;
use tokio::sync::oneshot;
use tracing::trace;
use super::{Source, TransportAddrInfo, TransportAddrUsage};
use crate::{
address_lookup::Error as AddressLookupError, metrics::SocketMetrics, socket::transports,
};
/// Maximum number of IP paths we keep around per endpoint.
pub(super) const MAX_IP_PATHS: usize = 30;
/// Maximum number of inactive IP paths we keep around per endpoint.
///
/// These are paths that at one point been opened and are now closed.
pub(super) const MAX_INACTIVE_IP_PATHS: usize = 10;
/// Map of all paths that we are aware of for a remote endpoint.
///
/// Also stores a list of resolve requests which are triggered once at least one path is known,
/// or once this struct is notified of a failed Address Lookup run.
#[derive(Debug)]
pub(super) struct RemotePathState {
/// All possible paths we are aware of.
///
/// These paths might be entirely impossible to use, since they are added by Address Lookup
/// mechanisms. The are only potentially usable.
paths: FxHashMap<transports::Addr, PathState>,
/// Pending resolve requests from [`Self::resolve_remote`].
pending_resolve_requests: VecDeque<oneshot::Sender<Result<(), AddressLookupError>>>,
metrics: Arc<SocketMetrics>,
}
/// Describes the usability of this path, i.e. whether it has ever been opened,
/// when it was closed, or if it has never been usable.
#[derive(Debug, Default)]
pub(super) enum PathStatus {
/// This path is open and active.
Open,
/// This path was once opened, but was abandoned at the given [`Instant`].
Inactive(Instant),
/// This path was never usable (we attempted holepunching and it didn't work).
Unusable,
/// We have not yet attempted holepunching, or holepunching is currently in
/// progress, so we do not know the usability of this path.
#[default]
Unknown,
}
impl RemotePathState {
pub(super) fn new(metrics: Arc<SocketMetrics>) -> Self {
Self {
paths: Default::default(),
pending_resolve_requests: Default::default(),
metrics,
}
}
pub(super) fn to_remote_addrs(&self) -> Vec<TransportAddrInfo> {
self.paths
.iter()
.flat_map(|(addr, state)| {
let usage = match state.status {
PathStatus::Open => TransportAddrUsage::Active,
PathStatus::Inactive(_) | PathStatus::Unusable | PathStatus::Unknown => {
TransportAddrUsage::Inactive
}
};
Some(TransportAddrInfo {
addr: addr.clone().into(),
usage,
})
})
.collect()
}
/// Insert a new address of an open path into our list of paths.
///
/// This will emit pending resolve requests and trigger pruning paths.
pub(super) fn insert_open_path(&mut self, addr: transports::Addr, source: Source) {
match addr {
transports::Addr::Ip(_) => self.metrics.transport_ip_paths_added.inc(),
transports::Addr::Relay(_, _) => self.metrics.transport_relay_paths_added.inc(),
};
let state = self.paths.entry(addr).or_default();
state.status = PathStatus::Open;
state.sources.insert(source.clone(), Instant::now());
self.emit_pending_resolve_requests(None);
self.prune_paths();
}
/// Mark a path as abandoned.
///
/// If this path does not exist, it does nothing to the
/// `RemotePathState`
pub(super) fn abandoned_path(&mut self, addr: &transports::Addr) {
if let Some(state) = self.paths.get_mut(addr) {
if matches!(state.status, PathStatus::Open) {
match addr {
transports::Addr::Ip(_) => self.metrics.transport_ip_paths_removed.inc(),
transports::Addr::Relay(_, _) => {
self.metrics.transport_relay_paths_removed.inc()
}
};
}
match state.status {
PathStatus::Open | PathStatus::Inactive(_) => {
state.status = PathStatus::Inactive(Instant::now());
}
PathStatus::Unusable | PathStatus::Unknown => {
state.status = PathStatus::Unusable;
}
}
}
}
/// Inserts multiple addresses of unknown status into our list of potential paths.
///
/// This will emit pending resolve requests and trigger pruning paths.
pub(super) fn insert_multiple(
&mut self,
addrs: impl Iterator<Item = transports::Addr>,
source: Source,
) {
let now = Instant::now();
for addr in addrs {
self.paths
.entry(addr)
.or_default()
.sources
.insert(source.clone(), now);
}
trace!("added addressing information");
self.emit_pending_resolve_requests(None);
self.prune_paths();
}
/// Triggers `tx` immediately if there are any known paths, or store in the list of pending requests.
///
/// The pending requests will be resolved once a path becomes known, or once Address Lookup
/// concludes without results, whichever comes first.
///
/// Sends `Ok(())` over `tx` if there are any known paths, and an [`AddressLookupError`] if there are
/// no known paths by the time a Address Lookup run finished with an error or without results.
pub(super) fn resolve_remote(&mut self, tx: oneshot::Sender<Result<(), AddressLookupError>>) {
if !self.paths.is_empty() {
tx.send(Ok(())).ok();
} else {
self.pending_resolve_requests.push_back(tx);
}
}
/// Notifies that a Address Lookup run has finished.
///
/// This will emit pending resolve requests.
pub(super) fn address_lookup_finished(&mut self, result: Result<(), AddressLookupError>) {
self.emit_pending_resolve_requests(result.err());
}
/// Returns an iterator over the addresses of all paths.
pub(super) fn addrs(&self) -> impl Iterator<Item = &transports::Addr> {
self.paths.keys()
}
/// Returns whether this stores any addresses.
pub(super) fn is_empty(&self) -> bool {
self.paths.is_empty()
}
/// Replies to all pending resolve requests.
///
/// This is a no-op if no requests are queued. Replies `Ok` if we have any known paths,
/// otherwise with the provided `address_lookup_error` or with [`AddressLookupError::NoResults`].
fn emit_pending_resolve_requests(&mut self, address_lookup_error: Option<AddressLookupError>) {
if self.pending_resolve_requests.is_empty() {
return;
}
let result = match (self.paths.is_empty(), address_lookup_error) {
(false, _) => Ok(()),
(true, Some(err)) => Err(err),
(true, None) => Err(e!(AddressLookupError::NoResults)),
};
for tx in self.pending_resolve_requests.drain(..) {
tx.send(result.clone()).ok();
}
}
/// Prune paths.
///
/// Should be invoked any time we insert a new path.
///
/// We currently only prune IP paths. For more information on the criteria
/// for when and which paths we prune, look at the [`prune_ip_paths`] function.
pub(super) fn prune_paths(&mut self) {
// right now we only prune IP paths
prune_ip_paths(&mut self.paths);
}
}
/// The state of a single path to the remote endpoint.
///
/// Each path is identified by the destination [`transports::Addr`] and they are stored in
/// the [`RemotePathState`] map at [`RemoteStateActor::paths`].
///
/// [`RemoteStateActor::paths`]: super::RemoteStateActor::paths
#[derive(Debug, Default)]
pub(super) struct PathState {
/// How we learned about this path, and when.
///
/// We keep track of only the latest [`Instant`] for each [`Source`], keeping the size
/// of the map of sources down to one entry per type of source.
pub(super) sources: HashMap<Source, Instant>,
/// The usability status of this path.
pub(super) status: PathStatus,
}
/// Prunes the IP paths in the paths HashMap.
///
/// Only prunes if the number of IP paths is above [`MAX_IP_PATHS`].
///
/// Keeps paths that are open or of unknown status.
///
/// Always prunes paths that have unsuccessfully holepunched.
///
/// Keeps [`MAX_INACTIVE_IP_PATHS`] of the most recently closed paths
/// that are not currently being used but have successfully been
/// holepunched previously.
///
/// This all ensures that:
///
/// - We do not have unbounded growth of paths.
/// - If we have many paths for this remote, we prune the paths that cannot hole punch.
/// - We do not prune holepunched paths that are currently not in use too quickly. For example, if a large number of untested paths are added at once, we will not immediately prune all of the unused, but valid, paths at once.
fn prune_ip_paths(paths: &mut FxHashMap<transports::Addr, PathState>) {
// if the total number of paths is less than the max, bail early
if paths.len() < MAX_IP_PATHS {
return;
}
let ip_paths: Vec<_> = paths.iter().filter(|(addr, _)| addr.is_ip()).collect();
// if the total number of ip paths is less than the max, bail early
if ip_paths.len() < MAX_IP_PATHS {
return;
}
// paths that were opened at one point but have previously been closed
let mut inactive = Vec::with_capacity(ip_paths.len());
// paths where we attempted hole punching but it not successful
let mut failed = Vec::with_capacity(ip_paths.len());
for (addr, state) in ip_paths {
match state.status {
PathStatus::Inactive(t) => {
// paths where holepunching succeeded at one point, but the path was closed.
inactive.push((addr.clone(), t));
}
PathStatus::Unusable => {
// paths where holepunching has been attempted and failed.
failed.push(addr.clone());
}
_ => {
// ignore paths that are open or the status is unknown
}
}
}
// All paths are bad, don't prune all of them.
//
// This implies that `inactive` is empty.
if failed.len() == paths.len() {
// leave the max number of IP paths
failed.truncate(paths.len().saturating_sub(MAX_IP_PATHS));
}
// sort the potentially prunable from most recently closed to least recently closed
inactive.sort_by(|a, b| b.1.cmp(&a.1));
// Prune the "oldest" closed paths.
let old_inactive = inactive.split_off(inactive.len().saturating_sub(MAX_INACTIVE_IP_PATHS));
// collect all the paths that should be pruned
let must_prune: HashSet<_> = failed
.into_iter()
.chain(old_inactive.into_iter().map(|(addr, _)| addr))
.collect();
paths.retain(|addr, _| !must_prune.contains(addr));
}
#[cfg(test)]
mod tests {
use std::{
net::{Ipv4Addr, SocketAddrV4},
time::Duration,
};
use iroh_base::{RelayUrl, SecretKey};
use rand::SeedableRng;
use super::*;
fn ip_addr(port: u16) -> transports::Addr {
transports::Addr::Ip(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port).into())
}
fn path_state_inactive(closed: Instant) -> PathState {
PathState {
sources: HashMap::new(),
status: PathStatus::Inactive(closed),
}
}
fn path_state_unusable() -> PathState {
PathState {
sources: HashMap::new(),
status: PathStatus::Unusable,
}
}
#[test]
fn test_prune_under_max_paths() {
let mut paths = FxHashMap::default();
for i in 0..20 {
paths.insert(ip_addr(i), PathState::default());
}
prune_ip_paths(&mut paths);
assert_eq!(20, paths.len(), "should not prune when under MAX_IP_PATHS");
}
#[test]
fn test_prune_at_max_paths_no_prunable() {
let mut paths = FxHashMap::default();
// All paths are active (never abandoned), so none should be pruned
for i in 0..MAX_IP_PATHS {
paths.insert(ip_addr(i as u16), PathState::default());
}
prune_ip_paths(&mut paths);
assert_eq!(MAX_IP_PATHS, paths.len(), "should not prune active paths");
}
#[test]
fn test_prune_failed_holepunch() {
let mut paths = FxHashMap::default();
// Add 20 active paths
for i in 0..20 {
paths.insert(ip_addr(i), PathState::default());
}
// Add 15 failed holepunch paths (must_prune)
for i in 20..35 {
paths.insert(ip_addr(i), path_state_unusable());
}
prune_ip_paths(&mut paths);
// All failed holepunch paths should be pruned
assert_eq!(20, paths.len());
for i in 0..20 {
assert!(paths.contains_key(&ip_addr(i)));
}
for i in 20..35 {
assert!(!paths.contains_key(&ip_addr(i)));
}
}
#[test]
fn test_prune_keeps_most_recent_inactive() {
let mut paths = FxHashMap::default();
let now = Instant::now();
// Add 15 active paths
for i in 0..15 {
paths.insert(ip_addr(i), PathState::default());
}
// Add 20 inactive paths with different abandon times
// Ports 15-34, with port 34 being most recently abandoned
for i in 0..20 {
let abandoned_time = now - Duration::from_secs((20 - i) as u64);
paths.insert(ip_addr(15 + i as u16), path_state_inactive(abandoned_time));
}
assert_eq!(35, paths.len());
prune_ip_paths(&mut paths);
// Should keep 15 active + 10 most recently abandoned
assert_eq!(25, paths.len());
// Active paths should remain
for i in 0..15 {
assert!(paths.contains_key(&ip_addr(i)));
}
// Most recently abandoned (ports 25-34) should remain
for i in 25..35 {
assert!(paths.contains_key(&ip_addr(i)), "port {} should be kept", i);
}
// Oldest abandoned (ports 15-24) should be pruned
for i in 15..25 {
assert!(
!paths.contains_key(&ip_addr(i)),
"port {} should be pruned",
i
);
}
}
#[test]
fn test_prune_mixed_must_and_can_prune() {
let mut paths = FxHashMap::default();
let now = Instant::now();
// Add 15 active paths
for i in 0..15 {
paths.insert(ip_addr(i), PathState::default());
}
// Add 5 failed holepunch paths
for i in 15..20 {
paths.insert(ip_addr(i), path_state_unusable());
}
// Add 15 usable but abandoned paths
for i in 0..15 {
let abandoned_time = now - Duration::from_secs((15 - i) as u64);
paths.insert(ip_addr(20 + i as u16), path_state_inactive(abandoned_time));
}
assert_eq!(35, paths.len());
prune_ip_paths(&mut paths);
// Remove all failed paths -> down to 30
// Keep MAX_INACTIVE_IP_PATHS, eg remove 5 usable but abandoned paths -> down to 20
assert_eq!(20, paths.len());
// Active paths should remain
for i in 0..15 {
assert!(paths.contains_key(&ip_addr(i)));
}
// Failed holepunch should be pruned
for i in 15..20 {
assert!(!paths.contains_key(&ip_addr(i)));
}
// Most recently abandoned (ports 30-34) should remain
for i in 30..35 {
assert!(paths.contains_key(&ip_addr(i)), "port {} should be kept", i);
}
}
#[test]
fn test_prune_non_ip_paths_not_counted() {
let mut paths = FxHashMap::default();
// Add 25 IP paths (under MAX_IP_PATHS)
for i in 0..25 {
paths.insert(ip_addr(i), path_state_unusable());
}
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let relay_url: RelayUrl = url::Url::parse("https://localhost")
.expect("should be valid url")
.into();
// Add 10 relay addresses
for _ in 0..10 {
let id = SecretKey::generate(&mut rng).public();
let relay_addr = transports::Addr::Relay(relay_url.clone(), id);
paths.insert(relay_addr, PathState::default());
}
assert_eq!(35, paths.len()); // 25 IP + 10 relay
prune_ip_paths(&mut paths);
// Should not prune since IP paths < MAX_IP_PATHS
assert_eq!(35, paths.len());
}
#[test]
fn test_prune_preserves_never_dialed() {
let mut paths = FxHashMap::default();
// Add 20 never-dialed paths (PathStatus::Unknown)
for i in 0..20 {
paths.insert(ip_addr(i), PathState::default());
}
// Add 15 failed paths to trigger pruning
for i in 20..35 {
paths.insert(ip_addr(i), path_state_unusable());
}
prune_ip_paths(&mut paths);
// Never-dialed paths should be preserved
for i in 0..20 {
assert!(paths.contains_key(&ip_addr(i)));
}
}
#[test]
fn test_prune_all_paths_failed() {
let mut paths = FxHashMap::default();
// Add 40 failed holepunch paths (all paths have failed)
for i in 0..40 {
paths.insert(ip_addr(i), path_state_unusable());
}
assert_eq!(40, paths.len());
prune_ip_paths(&mut paths);
// Should keep MAX_IP_PATHS instead of pruning everything
// This prevents catastrophic loss of all path information
assert_eq!(
MAX_IP_PATHS,
paths.len(),
"should keep MAX_IP_PATHS when all paths failed"
);
}
#[test]
fn test_insert_open_path() {
let mut state = RemotePathState::new(Default::default());
let addr = ip_addr(1000);
let source = Source::Udp;
assert!(state.is_empty());
state.insert_open_path(addr.clone(), source.clone());
assert!(!state.is_empty());
assert!(state.paths.contains_key(&addr));
let path = &state.paths[&addr];
assert!(matches!(path.status, PathStatus::Open));
assert_eq!(path.sources.len(), 1);
assert!(path.sources.contains_key(&source));
}
#[test]
fn test_abandoned_path() {
let metrics = Arc::new(SocketMetrics::default());
let mut state = RemotePathState::new(metrics.clone());
// Test: Open goes to Inactive
let addr_open = ip_addr(1000);
state.insert_open_path(addr_open.clone(), Source::Udp);
assert!(matches!(state.paths[&addr_open].status, PathStatus::Open));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
state.abandoned_path(&addr_open);
assert!(matches!(
state.paths[&addr_open].status,
PathStatus::Inactive(_)
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
// Test: Inactive stays Inactive
state.abandoned_path(&addr_open);
assert!(matches!(
state.paths[&addr_open].status,
PathStatus::Inactive(_)
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
// Test: Unknown goes to Unusable
let addr_unknown = ip_addr(2000);
state.insert_multiple([addr_unknown.clone()].into_iter(), Source::Relay);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Unknown
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
state.abandoned_path(&addr_unknown);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Unusable
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
// Test: Unusable stays Unusable
state.abandoned_path(&addr_unknown);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Unusable
));
assert_eq!(metrics.transport_ip_paths_added.get(), 1);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
// Test: Unusable can go to open
state.insert_open_path(addr_unknown.clone(), Source::Udp);
assert!(matches!(
state.paths[&addr_unknown].status,
PathStatus::Open
));
assert_eq!(metrics.transport_ip_paths_added.get(), 2);
assert_eq!(metrics.transport_ip_paths_removed.get(), 1);
}
}
//! The state kept for each network path to a remote endpoint.
use ;
use e;
use Instant;
use FxHashMap;
use oneshot;
use trace;
use ;
use crate::;
/// Maximum number of IP paths we keep around per endpoint.
pub const MAX_IP_PATHS: usize = 30;
/// Maximum number of inactive IP paths we keep around per endpoint.
///
/// These are paths that at one point been opened and are now closed.
pub const MAX_INACTIVE_IP_PATHS: usize = 10;
/// Map of all paths that we are aware of for a remote endpoint.
///
/// Also stores a list of resolve requests which are triggered once at least one path is known,
/// or once this struct is notified of a failed Address Lookup run.
pub
/// Describes the usability of this path, i.e. whether it has ever been opened,
/// when it was closed, or if it has never been usable.
pub
/// The state of a single path to the remote endpoint.
///
/// Each path is identified by the destination [`transports::Addr`] and they are stored in
/// the [`RemotePathState`] map at [`RemoteStateActor::paths`].
///
/// [`RemoteStateActor::paths`]: super::RemoteStateActor::paths
pub
/// Prunes the IP paths in the paths HashMap.
///
/// Only prunes if the number of IP paths is above [`MAX_IP_PATHS`].
///
/// Keeps paths that are open or of unknown status.
///
/// Always prunes paths that have unsuccessfully holepunched.
///
/// Keeps [`MAX_INACTIVE_IP_PATHS`] of the most recently closed paths
/// that are not currently being used but have successfully been
/// holepunched previously.
///
/// This all ensures that:
///
/// - We do not have unbounded growth of paths.
/// - If we have many paths for this remote, we prune the paths that cannot hole punch.
/// - We do not prune holepunched paths that are currently not in use too quickly. For example, if a large number of untested paths are added at once, we will not immediately prune all of the unused, but valid, paths at once.