radio/iroh/src/socket/remote_map/remote_state.rs

use std::{
    collections::{BTreeSet, VecDeque},
    net::SocketAddr,
    pin::Pin,
    sync::Arc,
    task::Poll,
};

use iroh_base::{EndpointId, RelayUrl, TransportAddr};
use n0_error::StackResultExt;
use n0_future::{
    Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt,
    boxed::BoxStream,
    task::JoinSet,
    time::{self, Duration, Instant},
};
use n0_watcher::{Watchable, Watcher};
use quinn::WeakConnectionHandle;
use quinn_proto::{PathError, PathEvent, PathId, PathStatus, iroh_hp};
use rustc_hash::FxHashMap;
use sync_wrapper::SyncStream;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
use tokio_util::sync::CancellationToken;
use tracing::{Instrument, Level, debug, error, event, info_span, instrument, trace, warn};

use self::path_state::RemotePathState;
pub(crate) use self::path_watcher::PathWatchable;
pub use self::{
    path_watcher::{PathInfo, PathInfoList, PathInfoListIter, PathWatcher},
    remote_info::{RemoteInfo, TransportAddrInfo, TransportAddrUsage},
};
use super::Source;
use crate::{
    address_lookup::{
        AddressLookup, ConcurrentAddressLookup, Error as AddressLookupError,
        Item as AddressLookupItem,
    },
    endpoint::DirectAddr,
    socket::{
        Metrics as SocketMetrics,
        mapped_addrs::{AddrMap, MappedAddr, RelayMappedAddr},
        remote_map::Private,
        transports::{self, OwnedTransmit, TransportsSender},
    },
    util::MaybeFuture,
};

mod path_state;
mod path_watcher;
mod remote_info;

/// How often to attempt holepunching.
///
/// If there have been no changes to the NAT address candidates, holepunching will not be
/// attempted more frequently than at this interval.
const HOLEPUNCH_ATTEMPTS_INTERVAL: Duration = Duration::from_secs(5);

/// The latency at or under which we don't try to upgrade to a better path.
const GOOD_ENOUGH_LATENCY: Duration = Duration::from_millis(10);

// TODO: use this
// /// How long since the last activity we try to keep an established endpoint peering alive.
// ///
// /// It's also the idle time at which we stop doing QAD queries to keep NAT mappings alive.
// pub(super) const SESSION_ACTIVE_TIMEOUT: Duration = Duration::from_secs(45);

/// How often we try to upgrade to a better path.
///
/// Even if we have some non-relay route that works.
const UPGRADE_INTERVAL: Duration = Duration::from_secs(60);

/// The time after which an idle [`RemoteStateActor`] stops.
///
/// The actor only enters the idle state if no connections are active and no inbox senders exist
/// apart from the one stored in the endpoint map. Stopping and restarting the actor in this state
/// is not an issue; a timeout here serves the purpose of not stopping-and-recreating actors
/// in a high frequency, and to keep data about previous path around for subsequent connections.
const ACTOR_MAX_IDLE_TIMEOUT: Duration = Duration::from_secs(60);

/// The minimum RTT difference to make it worth switching IP paths
const RTT_SWITCHING_MIN_IP: Duration = Duration::from_millis(5);

/// How much do we prefer IPv6 over IPv4?
const IPV6_RTT_ADVANTAGE: Duration = Duration::from_millis(3);

/// A stream of events from all paths for all connections.
///
/// The connection is identified using [`ConnId`].  The event `Err` variant happens when the
/// actor has lagged processing the events, which is rather critical for us.
type PathEvents = MergeUnbounded<
    Pin<
        Box<dyn Stream<Item = (ConnId, Result<PathEvent, BroadcastStreamRecvError>)> + Send + Sync>,
    >,
>;

/// A stream of events of announced NAT traversal candidate addresses for all connections.
///
/// The connection is identified using [`ConnId`].
type AddrEvents = MergeUnbounded<
    Pin<
        Box<
            dyn Stream<Item = (ConnId, Result<iroh_hp::Event, BroadcastStreamRecvError>)>
                + Send
                + Sync,
        >,
    >,
>;

/// Either a stream of incoming results from [`ConcurrentAddressLookup::resolve`] or infinitely pending.
///
/// Set to [`Either::Left`] with an always-pending stream while address lookup is not running, and to
/// [`Either::Right`] while Address Lookup is running.
///
/// The stream returned from [`ConcurrentAddressLookup::resolve`] is `!Sync`. We use the (safe) [`SyncStream`]
/// wrapper to make it `Sync` so that the [`RemoteStateActor::run`] future stays `Send`.
type AddressLookupStream = Either<
    n0_future::stream::Pending<Result<AddressLookupItem, AddressLookupError>>,
    SyncStream<BoxStream<Result<AddressLookupItem, AddressLookupError>>>,
>;

/// The state we need to know about a single remote endpoint.
///
/// This actor manages all connections to the remote endpoint.  It will trigger holepunching
/// and select the best path etc.
pub(super) struct RemoteStateActor {
    /// The endpoint ID of the remote endpoint.
    endpoint_id: EndpointId,
    /// The endpoint ID of the local endpoint.
    local_endpoint_id: EndpointId,

    // Hooks into the rest of the Socket.
    //
    /// Metrics.
    metrics: Arc<SocketMetrics>,
    /// Our local addresses.
    ///
    /// These are our local addresses and any reflexive transport addresses.
    local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
    /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s.
    relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
    /// Address lookup service, cloned from the socket.
    address_lookup: ConcurrentAddressLookup,

    // Internal state - Quinn Connections we are managing.
    //
    /// All connections we have to this remote endpoint.
    connections: FxHashMap<ConnId, ConnectionState>,
    /// Notifications when connections are closed.
    connections_close: FuturesUnordered<OnClosed>,
    /// Events emitted by Quinn about path changes, for all paths, all connections.
    path_events: PathEvents,
    /// A stream of events of announced NAT traversal candidate addresses for all connections.
    addr_events: AddrEvents,

    // Internal state - Holepunching and path state.
    //
    /// All possible paths we are aware of.
    ///
    /// These paths might be entirely impossible to use, since they are added by Address Lookup
    /// mechanisms.  The are only potentially usable.
    paths: RemotePathState,
    /// Information about the last holepunching attempt.
    last_holepunch: Option<HolepunchAttempt>,

    /// The path we currently consider the preferred path to the remote endpoint.
    ///
    /// **We expect this path to work.** If we become aware this path is broken then it is
    /// set back to `None`.  Having a selected path does not mean we may not be able to get
    /// a better path: e.g. when the selected path is a relay path we still need to trigger
    /// holepunching regularly.
    ///
    /// We only select a path once the path is functional in Quinn.
    selected_path: Watchable<Option<transports::Addr>>,
    /// Time at which we should schedule the next holepunch attempt.
    scheduled_holepunch: Option<Instant>,
    /// When to next attempt opening paths in [`Self::pending_open_paths`].
    scheduled_open_path: Option<Instant>,
    /// Paths which we still need to open.
    ///
    /// They failed to open because we did not have enough CIDs issued by the remote.
    pending_open_paths: VecDeque<transports::Addr>,

    // Internal state - address lookup
    //
    /// Stream of Address Lookup results, or always pending if Address Lookup is not running.
    address_lookup_stream: AddressLookupStream,
}

impl RemoteStateActor {
    #[allow(clippy::too_many_arguments)]
    pub(super) fn new(
        endpoint_id: EndpointId,
        local_endpoint_id: EndpointId,
        local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
        relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
        metrics: Arc<SocketMetrics>,
        address_lookup: ConcurrentAddressLookup,
    ) -> Self {
        Self {
            endpoint_id,
            local_endpoint_id,
            metrics: metrics.clone(),
            local_direct_addrs,
            relay_mapped_addrs,
            address_lookup,
            connections: FxHashMap::default(),
            connections_close: Default::default(),
            path_events: Default::default(),
            addr_events: Default::default(),
            paths: RemotePathState::new(metrics),
            last_holepunch: None,
            selected_path: Default::default(),
            scheduled_holepunch: None,
            scheduled_open_path: None,
            pending_open_paths: VecDeque::new(),
            address_lookup_stream: Either::Left(n0_future::stream::pending()),
        }
    }

    pub(super) fn start(
        self,
        initial_msgs: Vec<RemoteStateMessage>,
        tasks: &mut JoinSet<(EndpointId, Vec<RemoteStateMessage>)>,
        shutdown_token: CancellationToken,
    ) -> mpsc::Sender<RemoteStateMessage> {
        let (tx, rx) = mpsc::channel(16);
        let me = self.local_endpoint_id;
        let endpoint_id = self.endpoint_id;

        // Ideally we'd use the endpoint span as parent.  We'd have to plug that span into
        // here somehow.  Instead we have no parent and explicitly set the me attribute.  If
        // we don't explicitly set a span we get the spans from whatever call happens to
        // first create the actor, which is often very confusing as it then keeps those
        // spans for all logging of the actor.
        tasks.spawn(
            self.run(initial_msgs, rx, shutdown_token)
                .instrument(info_span!(
                    parent: None,
                    "RemoteStateActor",
                    me = %me.fmt_short(),
                    remote = %endpoint_id.fmt_short(),
                )),
        );
        tx
    }

    /// Runs the main loop of the actor.
    ///
    /// Note that the actor uses async handlers for tasks from the main loop.  The actor is
    /// not processing items from the inbox while waiting on any async calls.  So some
    /// discipline is needed to not turn pending for a long time.
    async fn run(
        mut self,
        initial_msgs: Vec<RemoteStateMessage>,
        mut inbox: mpsc::Receiver<RemoteStateMessage>,
        shutdown_token: CancellationToken,
    ) -> (EndpointId, Vec<RemoteStateMessage>) {
        trace!("actor started");
        for msg in initial_msgs {
            self.handle_message(msg).await;
        }
        let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT);
        n0_future::pin!(idle_timeout);

        let check_connections = time::interval(UPGRADE_INTERVAL);
        n0_future::pin!(check_connections);

        loop {
            let scheduled_path_open = match self.scheduled_open_path {
                Some(when) => MaybeFuture::Some(time::sleep_until(when)),
                None => MaybeFuture::None,
            };
            n0_future::pin!(scheduled_path_open);
            let scheduled_hp = match self.scheduled_holepunch {
                Some(when) => MaybeFuture::Some(time::sleep_until(when)),
                None => MaybeFuture::None,
            };
            n0_future::pin!(scheduled_hp);
            if !inbox.is_empty() || !self.connections.is_empty() {
                idle_timeout
                    .as_mut()
                    .reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
            }

            tokio::select! {
                biased;

                _ = shutdown_token.cancelled() => {
                    trace!("actor cancelled");
                    break;
                }
                msg = inbox.recv() => {
                    match msg {
                        Some(msg) => self.handle_message(msg).await,
                        None => break,
                    }
                }
                Some((id, evt)) = self.path_events.next() => {
                    self.handle_path_event(id, evt);
                }
                Some((id, evt)) = self.addr_events.next() => {
                    trace!(?id, ?evt, "remote addrs updated, triggering holepunching");
                    self.trigger_holepunching();
                }
                Some(conn_id) = self.connections_close.next(), if !self.connections_close.is_empty() => {
                    self.handle_connection_close(conn_id);
                }
                res = self.local_direct_addrs.updated() => {
                    if let Err(n0_watcher::Disconnected) = res {
                        trace!("direct address watcher disconnected, shutting down");
                        break;
                    }
                    self.local_addrs_updated();
                    trace!("local addrs updated, triggering holepunching");
                    self.trigger_holepunching();
                }
                _ = &mut scheduled_path_open => {
                    trace!("triggering scheduled path_open");
                    self.scheduled_open_path = None;
                    let mut addrs = std::mem::take(&mut self.pending_open_paths);
                    while let Some(addr) = addrs.pop_front() {
                        self.open_path(&addr);
                    }
                }
                _ = &mut scheduled_hp => {
                    trace!("triggering scheduled holepunching");
                    self.scheduled_holepunch = None;
                    self.trigger_holepunching();
                }
                item = self.address_lookup_stream.next() => {
                    self.handle_address_lookup_item(item);
                }
                _ = check_connections.tick() => {
                    self.check_connections();
                }
                _ = &mut idle_timeout => {
                    if self.connections.is_empty() && inbox.is_empty() {
                        trace!("idle timeout expired and still idle: terminate actor");
                        break;
                    } else {
                        // Seems like we weren't really idle, so we reset
                        idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
                    }
                }
            }
        }

        inbox.close();
        // There might be a race between checking `inbox.is_empty()` and `inbox.close()`,
        // so we pull out all messages that are left over.
        let mut leftover_msgs = Vec::with_capacity(inbox.len());
        inbox.recv_many(&mut leftover_msgs, inbox.len()).await;

        trace!("actor terminating");
        (self.endpoint_id, leftover_msgs)
    }

    /// Handles an actor message.
    ///
    /// Error returns are fatal and kill the actor.
    #[instrument(skip(self))]
    async fn handle_message(&mut self, msg: RemoteStateMessage) {
        // trace!("handling message");
        match msg {
            RemoteStateMessage::SendDatagram(sender, transmit) => {
                self.handle_msg_send_datagram(sender, transmit).await;
            }
            RemoteStateMessage::AddConnection(handle, tx) => {
                self.handle_msg_add_connection(handle, tx);
            }
            RemoteStateMessage::ResolveRemote(addrs, tx) => {
                self.handle_msg_resolve_remote(addrs, tx);
            }
            RemoteStateMessage::RemoteInfo(tx) => {
                let addrs = self.paths.to_remote_addrs();
                let info = RemoteInfo {
                    endpoint_id: self.endpoint_id,
                    addrs,
                };
                tx.send(info).ok();
            }
            RemoteStateMessage::NetworkChange { is_major } => {
                self.handle_network_change(is_major);
            }
        }
    }

    fn handle_network_change(&mut self, is_major: bool) {
        for conn in self.connections.values() {
            if let Some(quinn_conn) = conn.handle.upgrade() {
                for (path_id, addr) in &conn.open_paths {
                    if let Some(path) = quinn_conn.path(*path_id) {
                        // Ping the current path
                        if let Err(err) = path.ping() {
                            warn!(%err, %path_id, ?addr, "failed to ping path");
                        }
                    }
                }
            }
        }

        if is_major {
            self.trigger_holepunching();
        }
    }

    /// Handles regularly checking if any paths need hole punching currently
    ///
    /// Currently we need to have 1 IP path, with a good enough latency.
    fn check_connections(&mut self) {
        let mut is_goodenough = true;
        for conn_state in self.connections.values() {
            let mut is_conn_goodenough = false;
            if let Some(conn) = conn_state.handle.upgrade() {
                let min_ip_rtt = conn_state
                    .open_paths
                    .iter()
                    .filter_map(|(path_id, addr)| {
                        if addr.is_ip() {
                            conn.path_stats(*path_id).map(|stats| stats.rtt)
                        } else {
                            None
                        }
                    })
                    .min();

                if let Some(min_ip_rtt) = min_ip_rtt {
                    let is_latency_goodenough = min_ip_rtt <= GOOD_ENOUGH_LATENCY;
                    is_conn_goodenough = is_latency_goodenough;
                } else {
                    // No IP transport found
                    is_conn_goodenough = false;
                }
            }
            is_goodenough &= is_conn_goodenough;
        }

        if !is_goodenough {
            debug!("connections are not good enough, triggering holepunching");
            self.trigger_holepunching();
        }
    }

    /// Handles [`RemoteStateMessage::SendDatagram`].
    async fn handle_msg_send_datagram(
        &mut self,
        mut sender: Box<TransportsSender>,
        transmit: OwnedTransmit,
    ) {
        // Sending datagrams might fail, e.g. because we don't have the right transports set
        // up to handle sending this owned transmit to.
        // After all, we try every single path that we know (relay URL, IP address), even
        // though we might not have a relay transport or ip-capable transport set up.
        // So these errors must not be fatal for this actor (or even this operation).

        if let Some(addr) = self.selected_path.get() {
            trace!(?addr, "sending datagram to selected path");

            if let Err(err) = send_datagram(&mut sender, addr.clone(), transmit).await {
                debug!(?addr, "failed to send datagram on selected_path: {err:#}");
            }
        } else {
            trace!(
                paths = ?self.paths.addrs().collect::<Vec<_>>(),
                "sending datagram to all known paths",
            );
            if self.paths.is_empty() {
                warn!("Cannot send datagrams: No paths to remote endpoint known");
            }

            for addr in self.paths.addrs() {
                // We never want to send to our local addresses.
                // The local address set is updated in the main loop so we can use `peek` here.
                if let transports::Addr::Ip(sockaddr) = addr
                    && self
                        .local_direct_addrs
                        .peek()
                        .iter()
                        .any(|a| a.addr == *sockaddr)
                {
                    trace!(%sockaddr, "not sending datagram to our own address");
                } else if let Err(err) =
                    send_datagram(&mut sender, addr.clone(), transmit.clone()).await
                {
                    debug!(?addr, "failed to send datagram: {err:#}");
                }
            }
            // This message is received *before* a connection is added.  So we do
            // not yet have a connection to holepunch.  Instead we trigger
            // holepunching when AddConnection is received.
        }
    }

    /// Handles [`RemoteStateMessage::AddConnection`].
    ///
    /// Error returns are fatal and kill the actor.
    fn handle_msg_add_connection(
        &mut self,
        handle: WeakConnectionHandle,
        tx: oneshot::Sender<PathWatchable>,
    ) {
        let path_watchable = PathWatchable::new(self.selected_path.clone());
        if let Some(conn) = handle.upgrade() {
            self.metrics.num_conns_opened.inc();
            // Remove any conflicting stable_ids from the local state.
            let conn_id = ConnId(conn.stable_id());
            self.connections.remove(&conn_id);

            // Hook up paths, NAT addresses and connection closed event streams.
            self.path_events.push(Box::pin(
                BroadcastStream::new(conn.path_events()).map(move |evt| (conn_id, evt)),
            ));
            self.addr_events.push(Box::pin(
                BroadcastStream::new(conn.nat_traversal_updates()).map(move |evt| (conn_id, evt)),
            ));
            self.connections_close.push(OnClosed::new(&conn));

            // Add local addrs to the connection
            let local_addrs = self
                .local_direct_addrs
                .get()
                .iter()
                .map(|d| d.addr)
                .collect::<BTreeSet<_>>();
            Self::set_local_addrs(&conn, &local_addrs);

            // Store the connection
            let conn_state = self
                .connections
                .entry(conn_id)
                .insert_entry(ConnectionState {
                    handle: handle.clone(),
                    path_watchable: path_watchable.clone(),
                    paths: Default::default(),
                    open_paths: Default::default(),
                    path_ids: Default::default(),
                    has_been_direct: false,
                })
                .into_mut();

            // Store PathId(0), set path_status and select best path, check if holepunching
            // is needed.
            if let Some(path) = conn.path(PathId::ZERO)
                && let Ok(socketaddr) = path.remote_address()
                && let Some(path_remote) = self.relay_mapped_addrs.to_transport_addr(socketaddr)
            {
                trace!(?path_remote, "added new connection");
                let path_status = match path_remote {
                    transports::Addr::Ip(_) => PathStatus::Available,
                    transports::Addr::Relay(_, _) => PathStatus::Backup,
                };
                let res = path.set_status(path_status);
                event!(
                    target: "iroh::_events::path::set_status",
                    Level::DEBUG,
                    remote = %self.endpoint_id.fmt_short(),
                    ?path_remote,
                    ?path_status,
                    ?conn_id,
                    path_id = %PathId::ZERO,
                    ?res,
                );
                conn_state.add_open_path(path_remote.clone(), PathId::ZERO, &self.metrics);
                self.paths
                    .insert_open_path(path_remote.clone(), Source::Connection { _0: Private });
                self.select_path();

                if path_remote.is_ip() {
                    // We may have raced this with a relay address.  Try and add any
                    // relay addresses we have back.
                    let relays = self
                        .paths
                        .addrs()
                        .filter(|a| a.is_relay())
                        .cloned()
                        .collect::<Vec<_>>();
                    for remote in relays {
                        self.open_path(&remote);
                    }
                }
            }
            self.trigger_holepunching();
        }

        tx.send(path_watchable).ok();
    }

    /// Handles [`RemoteStateMessage::ResolveRemote`].
    fn handle_msg_resolve_remote(
        &mut self,
        addrs: BTreeSet<TransportAddr>,
        tx: oneshot::Sender<Result<(), AddressLookupError>>,
    ) {
        let addrs = to_transports_addr(self.endpoint_id, addrs);
        self.paths.insert_multiple(addrs, Source::App);
        self.paths.resolve_remote(tx);
        // Start Address Lookup if we have no selected path.
        self.trigger_address_lookup();
    }

    fn handle_connection_close(&mut self, conn_id: ConnId) {
        if self.connections.remove(&conn_id).is_some() {
            self.metrics.num_conns_closed.inc();
        }
        if self.connections.is_empty() {
            trace!("last connection closed - clearing selected_path");
            self.selected_path.set(None).ok();
        }
    }

    fn handle_address_lookup_item(
        &mut self,
        item: Option<Result<AddressLookupItem, AddressLookupError>>,
    ) {
        match item {
            None => {
                self.address_lookup_stream = Either::Left(n0_future::stream::pending());
                self.paths.address_lookup_finished(Ok(()));
            }
            Some(Err(err)) => {
                warn!("Address Lookup failed: {err:#}");
                self.address_lookup_stream = Either::Left(n0_future::stream::pending());
                self.paths.address_lookup_finished(Err(err));
            }
            Some(Ok(item)) => {
                if item.endpoint_id() != self.endpoint_id {
                    warn!(
                        ?item,
                        "Address Lookup emitted item for wrong remote endpoint"
                    );
                } else {
                    let source = Source::AddressLookup {
                        name: item.provenance().to_string(),
                    };
                    let addrs =
                        to_transports_addr(self.endpoint_id, item.into_endpoint_addr().addrs);
                    self.paths.insert_multiple(addrs, source);
                }
            }
        }
    }

    /// Triggers Address Lookup for the remote endpoint, if needed.
    ///
    /// Does not start Address Lookup if we have a selected path or if Address Lookup is currently running.
    fn trigger_address_lookup(&mut self) {
        if self.selected_path.get().is_some()
            || matches!(self.address_lookup_stream, Either::Right(_))
        {
            return;
        }
        match self.address_lookup.resolve(self.endpoint_id) {
            Some(stream) => self.address_lookup_stream = Either::Right(SyncStream::new(stream)),
            None => self.paths.address_lookup_finished(Ok(())),
        }
    }

    /// Sets the current local addresses to QNT's state to all connections
    fn local_addrs_updated(&mut self) {
        let local_addrs = self
            .local_direct_addrs
            .get()
            .iter()
            .map(|d| d.addr)
            .collect::<BTreeSet<_>>();

        for conn in self.connections.values().filter_map(|s| s.handle.upgrade()) {
            Self::set_local_addrs(&conn, &local_addrs);
        }
        // todo: trace
    }

    /// Sets the current local addresses to QNT's state
    fn set_local_addrs(conn: &quinn::Connection, local_addrs: &BTreeSet<SocketAddr>) {
        let quinn_local_addrs = match conn.get_local_nat_traversal_addresses() {
            Ok(addrs) => BTreeSet::from_iter(addrs),
            Err(err) => {
                warn!("failed to get local nat candidates: {err:#}");
                return;
            }
        };
        for addr in local_addrs.difference(&quinn_local_addrs) {
            if let Err(err) = conn.add_nat_traversal_address(*addr) {
                warn!("failed adding local addr: {err:#}",);
            }
        }
        for addr in quinn_local_addrs.difference(local_addrs) {
            if let Err(err) = conn.remove_nat_traversal_address(*addr) {
                warn!("failed removing local addr: {err:#}");
            }
        }
        trace!(?local_addrs, "updated local QNT addresses");
    }

    /// Triggers holepunching to the remote endpoint.
    ///
    /// This will manage the entire process of holepunching with the remote endpoint.
    ///
    /// - Holepunching happens on the Connection with the lowest [`ConnId`] which is a
    ///   client.
    ///   - Both endpoints may initiate holepunching if both have a client connection.
    ///   - Any opened paths are opened on all other connections without holepunching.
    /// - If there are no changes in local or remote candidate addresses since the
    ///   last attempt **and** there was a recent attempt, a trigger_holepunching call
    ///   will be scheduled instead.
    fn trigger_holepunching(&mut self) {
        if self.connections.is_empty() {
            trace!("not holepunching: no connections");
            return;
        }

        let Some(conn) = self
            .connections
            .iter()
            .filter_map(|(id, state)| state.handle.upgrade().map(|conn| (*id, conn)))
            .filter(|(_, conn)| conn.side().is_client())
            .min_by_key(|(id, _)| *id)
            .map(|(_, conn)| conn)
        else {
            trace!("not holepunching: no client connection");
            return;
        };
        let remote_candidates = match conn.get_remote_nat_traversal_addresses() {
            Ok(addrs) => BTreeSet::from_iter(addrs),
            Err(err) => {
                warn!("failed to get nat candidate addresses: {err:#}");
                return;
            }
        };
        let local_candidates: BTreeSet<SocketAddr> = self
            .local_direct_addrs
            .get()
            .iter()
            .map(|daddr| daddr.addr)
            .collect();
        let new_candidates = self
            .last_holepunch
            .as_ref()
            .map(|last_hp| {
                // Addrs are allowed to disappear, but if there are new ones we need to
                // holepunch again.
                trace!(
                    ?last_hp,
                    ?local_candidates,
                    ?remote_candidates,
                    "candidates to holepunch?"
                );
                !remote_candidates.is_subset(&last_hp.remote_candidates)
                    || !local_candidates.is_subset(&last_hp.local_candidates)
            })
            .unwrap_or(true);
        if !new_candidates && let Some(ref last_hp) = self.last_holepunch {
            let next_hp = last_hp.when + HOLEPUNCH_ATTEMPTS_INTERVAL;
            let now = Instant::now();
            if next_hp > now {
                trace!(scheduled_in = ?(next_hp - now), "not holepunching: no new addresses");
                self.scheduled_holepunch = Some(next_hp);
                return;
            }
        }

        self.do_holepunching(conn);
    }

    /// Unconditionally perform holepunching.
    #[instrument(skip_all)]
    fn do_holepunching(&mut self, conn: quinn::Connection) {
        self.metrics.holepunch_attempts.inc();
        let local_candidates = self
            .local_direct_addrs
            .get()
            .iter()
            .map(|daddr| daddr.addr)
            .collect::<BTreeSet<_>>();

        match conn.initiate_nat_traversal_round() {
            Ok(remote_candidates) => {
                let remote_candidates = remote_candidates
                    .iter()
                    .map(|addr| SocketAddr::new(addr.ip().to_canonical(), addr.port()))
                    .collect();
                event!(
                    target: "iroh::_events::qnt::init",
                    Level::DEBUG,
                    remote = %self.endpoint_id.fmt_short(),
                    ?local_candidates,
                    ?remote_candidates,
                );
                self.last_holepunch = Some(HolepunchAttempt {
                    when: Instant::now(),
                    local_candidates,
                    remote_candidates,
                });
            }
            Err(err) => {
                debug!("failed to initiate NAT traversal: {err:#}");
                use quinn_proto::iroh_hp::Error;
                match err {
                    Error::Closed
                    | Error::TooManyAddresses
                    | Error::WrongConnectionSide
                    | Error::ExtensionNotNegotiated => {
                        // Fatal, no need to retry for now
                    }
                    Error::Multipath(_) | Error::NotEnoughAddresses => {
                        // Retry in a bit
                        let now = Instant::now();
                        let next_hp = now + Duration::from_millis(100);
                        trace!(scheduled_in = ?(next_hp - now), "holepunching retry");
                        self.scheduled_holepunch = Some(next_hp);
                    }
                }
            }
        }
    }

    /// Open the path on all connections.
    ///
    /// This goes through all the connections for which we are the client, and makes sure
    /// the path exists, or opens it.
    #[instrument(level = "warn", skip(self))]
    fn open_path(&mut self, open_addr: &transports::Addr) {
        let path_status = match open_addr {
            transports::Addr::Ip(_) => PathStatus::Available,
            transports::Addr::Relay(_, _) => PathStatus::Backup,
        };
        let quic_addr = match &open_addr {
            transports::Addr::Ip(socket_addr) => *socket_addr,
            transports::Addr::Relay(relay_url, eid) => self
                .relay_mapped_addrs
                .get(&(relay_url.clone(), *eid))
                .private_socket_addr(),
        };

        for (conn_id, conn_state) in self.connections.iter_mut() {
            let Some(conn) = conn_state.handle.upgrade() else {
                continue;
            };
            if let Some(&path_id) = conn_state.path_ids.get(open_addr)
                && let Some(path) = conn.path(path_id)
            {
                // We still need to ensure that the path status is set correctly,
                // in case the path was opened by QNT, which opens all IP paths
                // using PATH_STATUS_BACKUP. We need to switch the selected path
                // to use PATH_STATUS_AVAILABLE though!
                let res = path.set_status(path_status);
                event!(
                    target: "iroh::_events::path::set_status",
                    Level::DEBUG,
                    remote = %self.endpoint_id.fmt_short(),
                    ?open_addr,
                    ?path_status,
                    ?conn_id,
                    %path_id,
                    ?res,
                );
                continue;
            }
            if conn.side().is_server() {
                continue;
            }
            let fut = conn.open_path_ensure(quic_addr, path_status);
            match fut.path_id() {
                Some(path_id) => {
                    trace!(?conn_id, %path_id, ?path_status, "opening new path");
                    conn_state.add_path(open_addr.clone(), path_id);
                    // Just like in the PATH_STATUS comment above, we need to make sure that the
                    // path status is set correctly, even if the path already existed.
                    if let Some(path) = conn.path(path_id) {
                        let res = path.set_status(path_status);
                        event!(
                            target: "iroh::_events::path::set_status",
                            Level::DEBUG,
                            remote = %self.endpoint_id.fmt_short(),
                            ?open_addr,
                            ?path_status,
                            ?conn_id,
                            %path_id,
                            ?res,
                        );
                        if let Err(e) = res {
                            warn!(?e, ?open_addr, ?path_status, "Setting path status failed");
                        }
                    }
                }
                None => {
                    let ret = now_or_never(fut);
                    match ret {
                        Some(Err(PathError::RemoteCidsExhausted)) => {
                            self.scheduled_open_path =
                                Some(Instant::now() + Duration::from_millis(333));
                            self.pending_open_paths.push_back(open_addr.clone());
                            trace!(?open_addr, "scheduling open_path");
                        }
                        _ => warn!(?ret, "Opening path failed"),
                    }
                }
            }
        }
    }

    #[instrument(skip(self))]
    fn handle_path_event(
        &mut self,
        conn_id: ConnId,
        event: Result<PathEvent, BroadcastStreamRecvError>,
    ) {
        let Ok(event) = event else {
            warn!("missed a PathEvent, RemoteStateActor lagging");
            // TODO: Is it possible to recover using the sync APIs to figure out what the
            //    state of the connection and it's paths are?
            return;
        };
        let Some(conn_state) = self.connections.get_mut(&conn_id) else {
            trace!("event for removed connection");
            return;
        };
        let Some(conn) = conn_state.handle.upgrade() else {
            trace!("event for closed connection");
            return;
        };
        trace!("path event");
        match event {
            PathEvent::Opened { id: path_id } => {
                let Some(path) = conn.path(path_id) else {
                    trace!("path open event for unknown path");
                    return;
                };

                if let Ok(socketaddr) = path.remote_address()
                    && let Some(path_remote) = self.relay_mapped_addrs.to_transport_addr(socketaddr)
                {
                    event!(
                        target: "iroh::_events::path::open",
                        Level::DEBUG,
                        remote = %self.endpoint_id.fmt_short(),
                        ?path_remote,
                        ?conn_id,
                        %path_id,
                    );
                    conn_state.add_open_path(path_remote.clone(), path_id, &self.metrics);
                    self.paths
                        .insert_open_path(path_remote.clone(), Source::Connection { _0: Private });
                }

                self.select_path();
            }
            PathEvent::Abandoned { id, path_stats } => {
                trace!(?path_stats, "path abandoned");
                // This is the last event for this path.
                if let Some(addr) = conn_state.remove_path(&id) {
                    self.paths.abandoned_path(&addr);
                }
            }
            PathEvent::LocallyClosed { id, .. } => {
                let Some(path_remote) = conn_state.paths.get(&id).cloned() else {
                    debug!("path not in path_id_map");
                    return;
                };
                event!(
                    target: "iroh::_events::path::closed",
                    Level::DEBUG,
                    remote = %self.endpoint_id.fmt_short(),
                    ?path_remote,
                    ?conn_id,
                    path_id = ?id,
                );
                conn_state.remove_open_path(&id);

                // If one connection closes this path, close it on all connections.
                for (conn_id, conn_state) in self.connections.iter_mut() {
                    let Some(path_id) = conn_state.path_ids.get(&path_remote) else {
                        continue;
                    };
                    let Some(conn) = conn_state.handle.upgrade() else {
                        continue;
                    };
                    if let Some(path) = conn.path(*path_id) {
                        trace!(?path_remote, ?conn_id, %path_id, "closing path");
                        if let Err(err) = path.close() {
                            trace!(
                                ?path_remote,
                                ?conn_id,
                                %path_id,
                                "path close failed: {err:#}"
                            );
                        }
                    }
                }

                // If the remote closed our selected path, select a new one.
                self.select_path();
            }
            PathEvent::RemoteStatus { .. } | PathEvent::ObservedAddr { .. } => {
                // Nothing to do for these events.
            }
        }
    }

    fn path_selection_data(
        &self,
        addr: transports::Addr,
        rtt: Duration,
    ) -> (transports::Addr, PathSelectionData) {
        let status = match addr {
            transports::Addr::Ip(_) => PathStatus::Available,
            transports::Addr::Relay(_, _) => PathStatus::Backup,
        };
        let biased_rtt = match addr {
            transports::Addr::Ip(SocketAddr::V4(_)) => rtt.as_nanos() as i128,
            transports::Addr::Ip(SocketAddr::V6(_)) => {
                rtt.as_nanos() as i128 - IPV6_RTT_ADVANTAGE.as_nanos() as i128
            }
            transports::Addr::Relay(_, _) => rtt.as_nanos() as i128,
        };
        (
            addr,
            PathSelectionData {
                status,
                rtt,
                biased_rtt,
            },
        )
    }

    /// Selects the path with the lowest RTT, prefers direct paths.
    ///
    /// If there are direct paths, this selects the direct path with the lowest RTT.  If
    /// there are only relay paths, the relay path with the lowest RTT is chosen.
    ///
    /// The selected path is added to any connections which do not yet have it.  Any unused
    /// direct paths are closed for all connections.
    #[instrument(skip_all)]
    fn select_path(&mut self) {
        // Find the lowest RTT across all connections for each open path.  The long way, so
        // we get to log *all* RTTs.
        let mut all_path_rtts: FxHashMap<transports::Addr, Vec<Duration>> = FxHashMap::default();
        for conn_state in self.connections.values() {
            let Some(conn) = conn_state.handle.upgrade() else {
                continue;
            };
            for (path_id, addr) in conn_state.open_paths.iter() {
                if let Some(stats) = conn.path_stats(*path_id) {
                    all_path_rtts
                        .entry(addr.clone())
                        .or_default()
                        .push(stats.rtt);
                }
            }
        }
        trace!(?all_path_rtts, "dumping all path RTTs");
        let path_rtts: FxHashMap<transports::Addr, PathSelectionData> = all_path_rtts
            .into_iter()
            .filter_map(|(addr, rtts)| rtts.into_iter().min().map(|rtt| (addr, rtt)))
            .map(|(addr, rtt)| self.path_selection_data(addr, rtt))
            .collect();

        let current_path = self.selected_path.get();
        let selected_path = select_best_path(path_rtts, &current_path);

        // Apply our new path
        if let Some((addr, rtt)) = selected_path {
            let prev = self.selected_path.set(Some(addr.clone()));
            if prev.is_ok() {
                event!(
                    target: "iroh::_events::path::selected",
                    Level::DEBUG,
                    remote = %self.endpoint_id.fmt_short(),
                    path_remote = ?addr,
                    ?rtt,
                    prev_remote = ?prev,
                );
            }
            self.open_path(&addr);
            self.close_redundant_paths(&addr);
        } else {
            trace!(?current_path, "keeping current path");
        }
    }

    /// Closes any direct paths not selected if we are the client.
    ///
    /// Makes sure not to close the last direct path.  Relay paths are never closed
    /// currently, because we only have one relay path at this time.
    ///
    /// Only the client closes paths, just like only the client opens paths.  This is to
    /// avoid the client and server selecting different paths and accidentally closing all
    /// paths.
    fn close_redundant_paths(&mut self, selected_path: &transports::Addr) {
        debug_assert_eq!(self.selected_path.get().as_ref(), Some(selected_path),);

        for (conn_id, conn_state) in self.connections.iter() {
            for (path_id, path_remote) in conn_state
                .open_paths
                .iter()
                .filter(|(_, addr)| addr.is_ip())
                .filter(|(_, addr)| *addr != selected_path)
            {
                if conn_state.open_paths.values().filter(|a| a.is_ip()).count() <= 1 {
                    continue; // Do not close the last direct path.
                }
                if let Some(path) = conn_state
                    .handle
                    .upgrade()
                    .filter(|conn| conn.side().is_client())
                    .and_then(|conn| conn.path(*path_id))
                {
                    trace!(?path_remote, ?conn_id, %path_id, "closing direct path");
                    match path.close() {
                        Err(quinn_proto::ClosePathError::MultipathNotNegotiated) => {
                            error!("multipath not negotiated");
                        }
                        Err(quinn_proto::ClosePathError::LastOpenPath) => {
                            error!("could not close last open path");
                        }
                        Err(quinn_proto::ClosePathError::ClosedPath) => {
                            // We already closed this.
                        }
                        Ok(_fut) => {
                            // We will handle the event in Self::handle_path_events.
                        }
                    }
                }
            }
        }
    }
}

/// Data used during path selection.
#[derive(Debug)]
struct PathSelectionData {
    /// Status of the path if it would be selected.
    status: PathStatus,
    /// Measured RTT for path selection.
    rtt: Duration,
    /// Biased RTT for path selection.
    ///
    /// This is an i128 so we can subtract an advantage for e.g. IPv6 without underflowing.
    biased_rtt: i128,
}

impl PathSelectionData {
    /// Key for sorting paths. Lower is better.
    ///
    /// First part is the status, 0 for Available, 1 for Backup.
    /// Second part is the biased RTT.
    fn sort_key(&self) -> (u8, i128) {
        (self.status as u8, self.biased_rtt)
    }
}

/// Returns `Some` if a new path should be selected, `None` if the `current_path` should
/// continued to be used.
fn select_best_path(
    all_paths: FxHashMap<transports::Addr, PathSelectionData>,
    current_path: &Option<transports::Addr>,
) -> Option<(transports::Addr, Duration)> {
    // Determine the best new path according to sort_key.
    // If there is no path, return None.
    let (best_addr, best_data) = all_paths.iter().min_by_key(|(_, psd)| psd.sort_key())?;
    // If there is no current path, always switch to the best path.
    let Some(addr) = current_path else {
        return Some((best_addr.clone(), best_data.rtt));
    };
    // Get current data. If we don't have data for the current path, switch to the best path.
    let Some(current_data) = all_paths.get(addr) else {
        return Some((best_addr.clone(), best_data.rtt));
    };
    if current_data.status != best_data.status {
        // Always switch if the status is different (better).
        Some((best_addr.clone(), best_data.rtt))
    } else if best_data.rtt + RTT_SWITCHING_MIN_IP <= current_data.rtt {
        // For the same status, only switch if the RTT is significantly better.
        Some((best_addr.clone(), best_data.rtt))
    } else {
        None
    }
}

fn send_datagram<'a>(
    sender: &'a mut TransportsSender,
    dst: transports::Addr,
    owned_transmit: OwnedTransmit,
) -> impl Future<Output = n0_error::Result<()>> + 'a {
    std::future::poll_fn(move |cx| {
        let transmit = transports::Transmit {
            ecn: owned_transmit.ecn,
            contents: owned_transmit.contents.as_ref(),
            segment_size: owned_transmit.segment_size,
        };

        Pin::new(&mut *sender)
            .poll_send(cx, &dst, None, &transmit)
            .map(|res| res.with_context(|_| format!("failed to send datagram to {dst:?}")))
    })
}

/// Messages to send to the [`RemoteStateActor`].
#[derive(derive_more::Debug)]
pub(crate) enum RemoteStateMessage {
    /// Sends a datagram to all known paths.
    ///
    /// Used to send QUIC Initial packets.  If there is no working direct path this will
    /// trigger holepunching.
    ///
    /// This is not acceptable to use on the normal send path, as it is an async send
    /// operation with a bunch more copying.  So it should only be used for sending QUIC
    /// Initial packets.
    #[debug("SendDatagram(..)")]
    SendDatagram(Box<TransportsSender>, OwnedTransmit),
    /// Adds an active connection to this remote endpoint.
    ///
    /// The connection will now be managed by this actor.  Holepunching will happen when
    /// needed, any new paths discovered via holepunching will be added.  And closed paths
    /// will be removed etc.
    #[debug("AddConnection(..)")]
    AddConnection(WeakConnectionHandle, oneshot::Sender<PathWatchable>),
    /// Asks if there is any possible path that could be used.
    ///
    /// This adds the provided transport addresses to the list of potential paths for this remote
    /// and starts Address Lookup if needed.
    ///
    /// Returns `Ok` immediately if the provided address list is non-empy or we have are other known paths.
    /// Otherwise returns `Ok` once Address Lookup produces a result, or the Address Lookup error if Address Lookup fails
    /// or produces no results,
    #[debug("ResolveRemote(..)")]
    ResolveRemote(
        BTreeSet<TransportAddr>,
        oneshot::Sender<Result<(), AddressLookupError>>,
    ),
    /// Returns information about the remote.
    ///
    /// This currently only includes a list of all known transport addresses for the remote.
    RemoteInfo(oneshot::Sender<RemoteInfo>),
    /// The network status has changed in some way
    NetworkChange { is_major: bool },
}

/// Information about a holepunch attempt.
///
/// Addresses are always stored in canonical form.
#[derive(Debug)]
struct HolepunchAttempt {
    when: Instant,
    /// The set of local addresses which could take part in holepunching.
    ///
    /// This does not mean every address here participated in the holepunching.  E.g. we
    /// could have tried only a sub-set of the addresses because a previous attempt already
    /// covered part of the range.
    ///
    /// We do not store this as a [`DirectAddr`] because this is checked for equality and we
    /// do not want to compare the sources of these addresses.
    local_candidates: BTreeSet<SocketAddr>,
    /// The set of remote addresses which could take part in holepunching.
    ///
    /// Like [`Self::local_candidates`] we may not have used them.
    remote_candidates: BTreeSet<SocketAddr>,
}

/// Newtype to track Connections.
///
/// The wrapped value is the [`quinn::Connection::stable_id`] value, and is thus only valid
/// for active connections.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
struct ConnId(usize);

/// State about one connection.
#[derive(Debug)]
struct ConnectionState {
    /// Weak handle to the connection.
    handle: WeakConnectionHandle,
    /// The information we publish to users about the paths used in this connection.
    path_watchable: PathWatchable,
    /// The paths that exist on this connection.
    ///
    /// This could be in any state, e.g. while still validating the path or already closed
    /// but not yet fully removed from the connection.  This exists as long as Quinn knows
    /// about the [`PathId`].
    paths: FxHashMap<PathId, transports::Addr>,
    /// The open paths on this connection, a subset of [`Self::paths`].
    open_paths: FxHashMap<PathId, transports::Addr>,
    /// Reverse map of [`Self::paths].
    path_ids: FxHashMap<transports::Addr, PathId>,
    /// Whether this connection has ever had a direct path.
    ///
    /// Used for recording metrics.
    has_been_direct: bool,
}

impl Drop for ConnectionState {
    fn drop(&mut self) {
        self.path_watchable.close();
    }
}

impl ConnectionState {
    /// Tracks a path for the connection.
    fn add_path(&mut self, remote: transports::Addr, path_id: PathId) {
        self.paths.insert(path_id, remote.clone());
        self.path_ids.insert(remote, path_id);
    }

    /// Tracks an open path for the connection.
    fn add_open_path(
        &mut self,
        remote: transports::Addr,
        path_id: PathId,
        metrics: &Arc<SocketMetrics>,
    ) {
        match remote {
            transports::Addr::Ip(_) => metrics.paths_direct.inc(),
            transports::Addr::Relay(_, _) => metrics.paths_relay.inc(),
        };
        if !self.has_been_direct && remote.is_ip() {
            self.has_been_direct = true;
            metrics.num_conns_direct.inc();
        }
        self.paths.insert(path_id, remote.clone());
        self.open_paths.insert(path_id, remote.clone());
        self.path_ids.insert(remote.clone(), path_id);

        if let Some(conn) = self.handle.upgrade() {
            self.path_watchable.insert(&conn, path_id, remote.into());
        }
    }

    /// Completely removes a path from this connection.
    fn remove_path(&mut self, path_id: &PathId) -> Option<transports::Addr> {
        let addr = self.paths.remove(path_id);
        if let Some(ref addr) = addr {
            self.path_ids.remove(addr);
        }
        self.open_paths.remove(path_id);
        self.path_watchable.set_abandoned(*path_id);

        addr
    }

    /// Removes the path from the open paths.
    fn remove_open_path(&mut self, path_id: &PathId) {
        self.open_paths.remove(path_id);
        self.path_watchable.set_abandoned(*path_id);
    }
}

/// Poll a future once, like n0_future::future::poll_once but sync.
fn now_or_never<T, F: Future<Output = T>>(fut: F) -> Option<T> {
    let fut = std::pin::pin!(fut);
    match fut.poll(&mut std::task::Context::from_waker(std::task::Waker::noop())) {
        Poll::Ready(res) => Some(res),
        Poll::Pending => None,
    }
}

/// Future that resolves to the `conn_id` once a connection is closed.
///
/// This uses [`quinn::Connection::on_closed`], which does not keep the connection alive
/// while awaiting the future.
struct OnClosed {
    conn_id: ConnId,
    inner: quinn::OnClosed,
}

impl OnClosed {
    fn new(conn: &quinn::Connection) -> Self {
        Self {
            conn_id: ConnId(conn.stable_id()),
            inner: conn.on_closed(),
        }
    }
}

impl Future for OnClosed {
    type Output = ConnId;

    fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        let (_close_reason, _stats) = std::task::ready!(Pin::new(&mut self.inner).poll(cx));
        Poll::Ready(self.conn_id)
    }
}

/// Converts an iterator of [`TransportAddr'] into an iterator of [`transports::Addr`].
fn to_transports_addr(
    endpoint_id: EndpointId,
    addrs: impl IntoIterator<Item = TransportAddr>,
) -> impl Iterator<Item = transports::Addr> {
    addrs.into_iter().filter_map(move |addr| match addr {
        TransportAddr::Relay(relay_url) => Some(transports::Addr::from((relay_url, endpoint_id))),
        TransportAddr::Ip(sockaddr) => Some(transports::Addr::from(sockaddr)),
        _ => {
            warn!(?addr, "Unsupported TransportAddr");
            None
        }
    })
}

#[cfg(test)]
mod tests {
    use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};

    use super::*;

    fn v4(port: u16) -> transports::Addr {
        transports::Addr::Ip(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, port)))
    }

    fn v6(port: u16) -> transports::Addr {
        transports::Addr::Ip(SocketAddr::V6(SocketAddrV6::new(
            Ipv6Addr::LOCALHOST,
            port,
            0,
            0,
        )))
    }

    fn relay(port: u16) -> transports::Addr {
        let url = format!("https://relay{port}.iroh.computer")
            .parse::<RelayUrl>()
            .unwrap();
        transports::Addr::Relay(url, EndpointId::from_bytes(&[0u8; 32]).unwrap())
    }

    fn psd(status: PathStatus, rtt_ms: u64) -> PathSelectionData {
        let rtt = Duration::from_millis(rtt_ms);
        let biased_rtt = rtt.as_nanos() as i128;
        PathSelectionData {
            status,
            rtt,
            biased_rtt,
        }
    }

    fn psd_v6(status: PathStatus, rtt_ms: u64) -> PathSelectionData {
        let rtt = Duration::from_millis(rtt_ms);
        // IPv6 gets a bias advantage
        let biased_rtt = rtt.as_nanos() as i128 - IPV6_RTT_ADVANTAGE.as_nanos() as i128;
        PathSelectionData {
            status,
            rtt,
            biased_rtt,
        }
    }

    #[test]
    fn test_ipv6_wins_over_ipv4_within_bias() {
        // IPv6 should win over IPv4 when RTTs are the same
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 10));
        paths.insert(v6(1), psd_v6(PathStatus::Available, 10));

        let result = select_best_path(paths, &None);
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V6(_))));

        // IPv6 should still win when it's slightly slower (within bias range)
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 10));
        paths.insert(v6(1), psd_v6(PathStatus::Available, 12)); // 2ms slower, but 3ms bias

        let result = select_best_path(paths, &None);
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V6(_))));

        // IPv4 should win when IPv6 is significantly slower
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 10));
        paths.insert(v6(1), psd_v6(PathStatus::Available, 20)); // 10ms slower, exceeds 3ms bias

        let result = select_best_path(paths, &None);
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V4(_))));
    }

    #[test]
    fn test_available_wins_over_backup_regardless_of_rtt() {
        // Available path should win even with much higher RTT
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 100)); // High RTT but Available
        paths.insert(relay(1), psd(PathStatus::Backup, 10)); // Low RTT but Backup

        let result = select_best_path(paths, &None);
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert!(addr.is_ip());

        // Even more extreme: 1000ms Available vs 1ms Backup
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 1000));
        paths.insert(relay(1), psd(PathStatus::Backup, 1));

        let result = select_best_path(paths, &None);
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert!(addr.is_ip());
    }

    #[test]
    fn test_same_category_only_switches_with_significant_rtt_diff() {
        let current = v4(1);

        // Should NOT switch: new path is only slightly better (2ms < 5ms threshold)
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 20));
        paths.insert(v4(2), psd(PathStatus::Available, 18));

        let result = select_best_path(paths, &Some(current.clone()));
        assert!(result.is_none()); // Should keep current

        // Should NOT switch: new path is just under threshold (4ms < 5ms)
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 20));
        paths.insert(v4(2), psd(PathStatus::Available, 16));

        let result = select_best_path(paths, &Some(current.clone()));
        assert!(result.is_none()); // Should keep current

        // SHOULD switch: new path is exactly at threshold (5ms, condition is <=)
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 20));
        paths.insert(v4(2), psd(PathStatus::Available, 15));

        let result = select_best_path(paths, &Some(current.clone()));
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert_eq!(addr, v4(2));

        // SHOULD switch: new path is significantly better (6ms > 5ms threshold)
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 20));
        paths.insert(v4(2), psd(PathStatus::Available, 14));

        let result = select_best_path(paths, &Some(current.clone()));
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert_eq!(addr, v4(2));
    }

    #[test]
    fn test_no_current_path_selects_best() {
        let mut paths = FxHashMap::default();
        paths.insert(v4(1), psd(PathStatus::Available, 20));
        paths.insert(v4(2), psd(PathStatus::Available, 10));

        let result = select_best_path(paths, &None);
        assert!(result.is_some());
        let (addr, _) = result.unwrap();
        assert_eq!(addr, v4(2)); // Lower RTT wins
    }

    #[test]
    fn test_empty_paths_returns_none() {
        let paths: FxHashMap<transports::Addr, PathSelectionData> = FxHashMap::default();
        let result = select_best_path(paths, &None);
        assert!(result.is_none());

        let paths: FxHashMap<transports::Addr, PathSelectionData> = FxHashMap::default();
        let result = select_best_path(paths, &Some(v4(1)));
        assert!(result.is_none());
    }
}

Neighbours