radio/iroh/src/endpoint.rs

//! The [`Endpoint`] allows establishing connections to other iroh endpoints.
//!
//! The [`Endpoint`] is the main API interface to manage a local iroh endpoint.  It allows
//! connecting to and accepting connections from other endpoints.  See the [module docs] for
//! more details on how iroh connections work.
//!
//! The main items in this module are:
//!
//! - [`Endpoint`] to establish iroh connections with other endpoints.
//! - [`Builder`] to create an [`Endpoint`].
//!
//! [module docs]: crate

#[cfg(not(wasm_browser))]
use std::net::SocketAddr;
use std::{pin::Pin, sync::Arc};

use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
use iroh_relay::{RelayConfig, RelayMap};
#[cfg(not(wasm_browser))]
use n0_error::bail;
use n0_error::{e, ensure, stack_error};
use n0_watcher::Watcher;
#[cfg(not(wasm_browser))]
use netdev::ipnet::{Ipv4Net, Ipv6Net};
use pin_project::pin_project;
use tokio_util::sync::WaitForCancellationFutureOwned;
use tracing::{debug, instrument, trace, warn};
use url::Url;

use self::hooks::EndpointHooksList;
pub use super::socket::{
    BindError, DirectAddr, DirectAddrType,
    remote_map::{
        PathInfo, PathInfoList, PathInfoListIter, PathWatcher, RemoteInfo, Source,
        TransportAddrInfo, TransportAddrUsage,
    },
};
#[cfg(wasm_browser)]
use crate::address_lookup::PkarrResolver;
#[cfg(not(wasm_browser))]
use crate::dns::DnsResolver;
use crate::{
    NetReport,
    address_lookup::{
        ConcurrentAddressLookup, DynIntoAddressLookup, Error as AddressLookupError,
        IntoAddressLookup, UserData,
    },
    endpoint::presets::Preset,
    metrics::EndpointMetrics,
    socket::{
        self, EndpointInner, RemoteStateActorStoppedError, StaticConfig, mapped_addrs::MappedAddr,
    },
    tls::{self, DEFAULT_MAX_TLS_TICKETS},
};

#[cfg(not(wasm_browser))]
mod bind;
mod connection;
pub(crate) mod hooks;
pub mod presets;
pub(crate) mod quic;

#[cfg(not(wasm_browser))]
pub use bind::{BindOpts, InvalidSocketAddr, ToSocketAddr};
pub use hooks::{AfterHandshakeOutcome, BeforeConnectOutcome, EndpointHooks};

#[cfg(feature = "qlog")]
pub use self::quic::{QlogConfig, QlogFactory, QlogFileFactory};
pub use self::{
    connection::{
        Accept, Accepting, AlpnError, AuthenticationError, Connecting, ConnectingError, Connection,
        ConnectionInfo, ConnectionState, HandshakeCompleted, Incoming, IncomingZeroRtt,
        IncomingZeroRttConnection, OutgoingZeroRtt, OutgoingZeroRttConnection,
        RemoteEndpointIdError, RetryError, ZeroRttStatus,
    },
    quic::{
        AcceptBi, AcceptUni, AckFrequencyConfig, AeadKey, ApplicationClose, Chunk, ClosedStream,
        ConnectionClose, ConnectionError, ConnectionStats, Controller, ControllerFactory,
        ControllerMetrics, CryptoError, Dir, ExportKeyingMaterialError, FrameStats, FrameType,
        HandshakeTokenKey, HeaderKey, IdleTimeout, Keys, MtuDiscoveryConfig, OpenBi, OpenUni,
        PacketKey, PathId, PathStats, QuicConnectError, QuicTransportConfig,
        QuicTransportConfigBuilder, ReadDatagram, ReadError, ReadExactError, ReadToEndError,
        RecvStream, ResetError, RttEstimator, SendDatagram, SendDatagramError, SendStream,
        ServerConfig, ServerConfigBuilder, Side, StoppedError, StreamId, TimeSource, TokenLog,
        TokenReuseError, TransportError, TransportErrorCode, TransportParameters, UdpStats,
        UnorderedRecvStream, UnsupportedVersion, ValidationTokenConfig, VarInt,
        VarIntBoundsExceeded, WriteError, Written,
    },
};
#[cfg(not(wasm_browser))]
use crate::socket::transports::IpConfig;
use crate::socket::transports::TransportConfig;

/// Builder for [`Endpoint`].
///
/// By default the endpoint will generate a new random [`SecretKey`], which will result in a
/// new [`EndpointId`].
///
/// To create the [`Endpoint`] call [`Builder::bind`].
#[derive(Debug)]
pub struct Builder {
    secret_key: Option<SecretKey>,
    alpn_protocols: Vec<Vec<u8>>,
    transport_config: QuicTransportConfig,
    keylog: bool,
    address_lookup: Vec<Box<dyn DynIntoAddressLookup>>,
    address_lookup_user_data: Option<UserData>,
    proxy_url: Option<Url>,
    #[cfg(not(wasm_browser))]
    dns_resolver: Option<DnsResolver>,
    #[cfg(any(test, feature = "test-utils"))]
    insecure_skip_relay_cert_verify: bool,
    transports: Vec<TransportConfig>,
    max_tls_tickets: usize,
    hooks: EndpointHooksList,
}

impl From<RelayMode> for Option<TransportConfig> {
    fn from(mode: RelayMode) -> Self {
        match mode {
            RelayMode::Disabled => None,
            RelayMode::Default => Some(TransportConfig::Relay {
                relay_map: mode.relay_map(),
                is_user_defined: true,
            }),
            RelayMode::Staging => Some(TransportConfig::Relay {
                relay_map: mode.relay_map(),
                is_user_defined: true,
            }),
            RelayMode::Custom(relay_map) => Some(TransportConfig::Relay {
                relay_map,
                is_user_defined: true,
            }),
        }
    }
}

impl Builder {
    // The ordering of public methods is reflected directly in the documentation.  This is
    // roughly ordered by what is most commonly needed by users.

    /// Creates a new [`Builder`] using the given [`Preset`].
    ///
    /// See [`presets`] for more.
    pub fn new<P: Preset>(preset: P) -> Self {
        Self::empty(RelayMode::Disabled).preset(preset)
    }

    /// Applies the given [`Preset`].
    pub fn preset<P: Preset>(mut self, preset: P) -> Self {
        self = preset.apply(self);
        self
    }

    /// Creates an empty builder with no address lookup  services.
    pub fn empty(relay_mode: RelayMode) -> Self {
        let mut transports = vec![
            #[cfg(not(wasm_browser))]
            TransportConfig::default_ipv4(),
            #[cfg(not(wasm_browser))]
            TransportConfig::default_ipv6(),
        ];
        if let Some(relay) = relay_mode.into() {
            transports.push(relay);
        }
        Self {
            secret_key: Default::default(),
            alpn_protocols: Default::default(),
            transport_config: QuicTransportConfig::default(),
            keylog: Default::default(),
            address_lookup: Default::default(),
            address_lookup_user_data: Default::default(),
            proxy_url: None,
            #[cfg(not(wasm_browser))]
            dns_resolver: None,
            #[cfg(any(test, feature = "test-utils"))]
            insecure_skip_relay_cert_verify: false,
            max_tls_tickets: DEFAULT_MAX_TLS_TICKETS,
            transports,
            hooks: Default::default(),
        }
    }

    // # The final constructor that everyone needs.

    /// Binds the endpoint.
    pub async fn bind(self) -> Result<Endpoint, BindError> {
        let mut rng = rand::rng();
        let secret_key = self
            .secret_key
            .unwrap_or_else(move || SecretKey::generate(&mut rng));

        let static_config = StaticConfig {
            transport_config: self.transport_config.clone(),
            tls_config: tls::TlsConfig::new(secret_key.clone(), self.max_tls_tickets),
            keylog: self.keylog,
        };
        let server_config = static_config.create_server_config(self.alpn_protocols);

        #[cfg(not(wasm_browser))]
        let dns_resolver = self.dns_resolver.unwrap_or_default();

        let metrics = EndpointMetrics::default();

        let sock_opts = socket::Options {
            transports: self.transports,
            secret_key,
            address_lookup_user_data: self.address_lookup_user_data,
            proxy_url: self.proxy_url,
            #[cfg(not(wasm_browser))]
            dns_resolver,
            server_config,
            #[cfg(any(test, feature = "test-utils"))]
            insecure_skip_relay_cert_verify: self.insecure_skip_relay_cert_verify,
            metrics,
            hooks: self.hooks,
            static_config,
        };

        let inner = socket::Socket::spawn(sock_opts).await?;
        trace!("created socket");
        debug!(version = env!("CARGO_PKG_VERSION"), "iroh Endpoint created");

        let ep = Endpoint {
            inner: Arc::new(inner),
        };

        // Add Address Lookup mechanisms
        for create_service in self.address_lookup {
            let service = create_service.into_address_lookup(&ep)?;
            ep.address_lookup()
                .expect("just created the endpoint")
                .add_boxed(service);
        }

        Ok(ep)
    }

    // # The very common methods everyone basically needs.

    /// Binds an IP socket at the provided socket address.
    ///
    /// This is an advanced API to tightly control the sockets used by the endpoint. Most
    /// uses do not need to explicitly bind sockets.
    ///
    /// # Warning
    ///
    /// - The builder always comes pre-configured with an IPv4 socket to be bound on the
    ///   *unspecified* address: `0.0.0.0`. This is the equivalent of using `INADDR_ANY`
    ///   special bind address and results in a socket listening on *all* interfaces
    ///   available.
    ///
    /// - Likewise the builder always comes pre-configured with an IPv6 socket to be bound
    ///   on the *unspecified* address: `[::]`. This bind is allowed to fail however.
    ///
    /// - Adding a bind address removes the pre-configured unspecified bind address for this
    ///   address family. Use [`Self::bind_addr_with_opts`] to bind additional addresses without
    ///   replacing the default bind address.
    ///
    /// - This should be called at most once for each address family: once for IPv4 and/or
    ///   once for IPv6. Calling it multiple times for the same address family will result
    ///   in undefined routing behaviour. To bind multiple sockets of the same address
    ///   family, use [`Self::bind_addr_with_opts`].
    ///
    /// # Description
    ///
    /// Requests a socket to be bound on a specific address, with an implied netmask of
    /// `/0`. This allows restricting binding to only one network interface for a given
    /// address family.
    ///
    /// If the port specified is already in use, binding the endpoint will fail. Using
    /// port `0` in the socket address assigns a random free port.
    ///
    /// # Example
    ///
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> n0_error::Result<()> {
    /// # use iroh::Endpoint;
    /// let endpoint = Endpoint::builder()
    ///     .clear_ip_transports()
    ///     .bind_addr("127.0.0.1:0")?
    ///     .bind_addr("[::1]:0")?
    ///     .bind()
    ///     .await?;
    /// # Ok(()) }
    /// ```
    #[cfg(not(wasm_browser))]
    pub fn bind_addr<A>(self, addr: A) -> Result<Self, InvalidSocketAddr>
    where
        A: ToSocketAddr,
        <A as ToSocketAddr>::Err: Into<InvalidSocketAddr>,
    {
        self.bind_addr_with_opts(addr, BindOpts::default())
    }

    /// Binds an IP socket at the provided socket address.
    ///
    /// This is an advanced API to tightly control the sockets used by the endpoint. Most
    /// uses do not need to explicitly bind sockets.
    ///
    /// # Warning
    ///
    /// - The builder always comes pre-configured with an IPv4 socket to be bound on the
    ///   *unspecified* address: `0.0.0.0`. This is the equivalent of using `INADDR_ANY`
    ///   special bind address and results in a socket listening on *all* interfaces
    ///   available.
    ///
    /// - Likewise the builder always comes pre-configured with an IPv6 socket to be bound
    ///   on the *unspecified* address: `[::]`. This bind is allowed to fail however.
    ///
    /// # Description
    ///
    /// Requests a socket to be bound on a specific address. This allows restricting binding
    /// to only one network interface for a given address family.
    ///
    /// [`BindOpts::set_prefix_len`] **should** be used to configure the netmask of the
    /// network interface. This allows outgoing datagrams that start a new network flow to
    /// be sent over the socket which is attached to the subnet of the destination
    /// address. If multiple sockets are bound the standard routing-table semantics are
    /// used: the socket attached to the subnet with the longest prefix matching the
    /// destination is used. Practically this means the smallest subnets are at the top of
    /// the routing table, and the first subnet containing the destination address is
    /// chosen.
    ///
    /// If no socket is bound to a subnet that contains the destination address, the notion
    /// of "default route" is used. At most one socket per address family may be marked as
    /// the default route using [`BindOpts::set_is_default_route`], and this will be used
    /// for destinations not contained by the subnets of the bound sockets. This network is
    /// expected to have a default gateway configured. A socket with a prefix length of `/0`
    /// will be set as a "default route" implicitly, unless [`BindOpts::set_is_default_route`]
    /// is set to `false` explicitly.
    ///
    /// Be aware that using a subnet with a prefix length of `/0` will always contain all
    /// destination addresses. It is valid to configure this, but no more than one such
    /// socket should be bound or the routing will be non-deterministic.
    ///
    /// To use a subnet with a non-zero prefix length as the default route in addition to
    /// being routed when its prefix matches, use [`BindOpts::set_is_default_route].
    /// Subnets with a prefix length of zero are always marked as default routes.
    ///
    /// Finally note that most outgoing datagrams are part of an existing network flow. That
    /// is, they are in response to an incoming datagram. In this case the outgoing datagram
    /// will be sent over the same socket as the incoming datagram was received on, and the
    /// routing with the prefix length and default route as described above does not apply.
    ///
    /// Using port `0` in the socket address assigns a random free port.
    ///
    /// If the port specified is already in use, binding the endpoint will fail, unless
    /// [`BindOpts::set_is_required`] is set to `false`
    ///
    /// # Example
    /// ```no_run
    /// # #[tokio::main]
    /// # async fn main() -> n0_error::Result<()> {
    /// # use iroh::{Endpoint, endpoint::BindOpts};
    /// let endpoint = Endpoint::builder()
    ///     .clear_ip_transports()
    ///     .bind_addr_with_opts("127.0.0.1:0", BindOpts::default().set_prefix_len(24))?
    ///     .bind_addr_with_opts("[::1]:0", BindOpts::default().set_prefix_len(48))?
    ///     .bind()
    ///     .await?;
    /// # Ok(()) }
    /// ```
    #[cfg(not(wasm_browser))]
    pub fn bind_addr_with_opts<A>(
        mut self,
        addr: A,
        opts: BindOpts,
    ) -> Result<Self, InvalidSocketAddr>
    where
        A: ToSocketAddr,
        <A as ToSocketAddr>::Err: Into<InvalidSocketAddr>,
    {
        let addr = addr.to_socket_addr().map_err(Into::into)?;
        match addr {
            SocketAddr::V4(addr) => {
                if self
                    .transports
                    .iter()
                    .any(|t| t.is_ipv4_default() && t.is_user_defined())
                {
                    bail!(InvalidSocketAddr::DuplicateDefaultAddr);
                }

                let ip_net = Ipv4Net::new(*addr.ip(), opts.prefix_len())?;
                self.transports.push(TransportConfig::Ip {
                    config: IpConfig::V4 {
                        ip_net,
                        port: addr.port(),
                        is_required: opts.is_required(),
                        is_default: opts.is_default_route(),
                    },
                    is_user_defined: true,
                });
            }
            SocketAddr::V6(addr) => {
                if self
                    .transports
                    .iter()
                    .any(|t| t.is_ipv6_default() && t.is_user_defined())
                {
                    bail!(InvalidSocketAddr::DuplicateDefaultAddr);
                }

                let ip_net = Ipv6Net::new(*addr.ip(), opts.prefix_len())?;
                self.transports.push(TransportConfig::Ip {
                    config: IpConfig::V6 {
                        ip_net,
                        scope_id: addr.scope_id(),
                        port: addr.port(),
                        is_required: opts.is_required(),
                        is_default: opts.is_default_route(),
                    },
                    is_user_defined: true,
                });
            }
        }
        Ok(self)
    }

    /// Removes all IP based transports.
    #[cfg(not(wasm_browser))]
    pub fn clear_ip_transports(mut self) -> Self {
        self.transports
            .retain(|t| !matches!(t, TransportConfig::Ip { .. }));
        self
    }

    /// Removes all relay based transports.
    pub fn clear_relay_transports(mut self) -> Self {
        self.transports
            .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
        self
    }

    /// Sets a secret key to authenticate with other peers.
    ///
    /// This secret key's public key will be the [`PublicKey`] of this endpoint and thus
    /// also its [`EndpointId`]
    ///
    /// If not set, a new secret key will be generated.
    ///
    /// [`PublicKey`]: iroh_base::PublicKey
    pub fn secret_key(mut self, secret_key: SecretKey) -> Self {
        self.secret_key = Some(secret_key);
        self
    }

    /// Sets the [ALPN] protocols that this endpoint will accept on incoming connections.
    ///
    /// Not setting this will still allow creating connections, but to accept incoming
    /// connections at least one [ALPN] must be set.
    ///
    /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
    pub fn alpns(mut self, alpn_protocols: Vec<Vec<u8>>) -> Self {
        self.alpn_protocols = alpn_protocols;
        self
    }

    // # Methods for common customisation items.

    /// Sets the relay servers to assist in establishing connectivity.
    ///
    /// Relay servers are used to establish initial connection with another iroh endpoint.
    /// They also perform various functions related to hole punching, see the [crate docs]
    /// for more details.
    ///
    /// By default the [number 0] relay servers are used, see [`RelayMode::Default`].
    ///
    /// When using [RelayMode::Custom], the provided `relay_map` must contain at least one
    /// configured relay endpoint.  If an invalid RelayMap is provided [`bind`]
    /// will result in an error.
    ///
    /// [`bind`]: Builder::bind
    /// [crate docs]: crate
    /// [number 0]: https://n0.computer
    pub fn relay_mode(mut self, relay_mode: RelayMode) -> Self {
        let transport: Option<_> = relay_mode.into();
        match transport {
            Some(transport) => {
                if let Some(og) = self
                    .transports
                    .iter_mut()
                    .find(|t| matches!(t, TransportConfig::Relay { .. }))
                {
                    *og = transport;
                } else {
                    self.transports.push(transport);
                }
            }
            None => {
                self.transports
                    .retain(|t| !matches!(t, TransportConfig::Relay { .. }));
            }
        }
        self
    }

    /// Removes all Address Lookup services from the builder.
    ///
    /// If no Address Lookup is set, connecting to an endpoint without providing its
    /// direct addresses or relay URLs will fail.
    ///
    /// See the documentation of the [`crate::address_lookup::AddressLookup`] trait for details.
    pub fn clear_address_lookup(mut self) -> Self {
        self.address_lookup.clear();
        self
    }

    /// Adds an additional Address Lookup for this endpoint.
    ///
    /// Once the endpoint is created the provided [`IntoAddressLookup::into_address_lookup`] will be
    /// called. This allows Address Lookup's to finalize their configuration by e.g. using
    /// the secret key from the endpoint which can be needed to sign published information.
    ///
    /// This method can be called multiple times and all the Address Lookup's passed in
    /// will be combined using an internal instance of the
    /// [`crate::address_lookup::ConcurrentAddressLookup`]. To clear all Address Lookup's, use
    /// [`Self::clear_address_lookup`].
    ///
    /// If no Address Lookup is set, connecting to an endpoint without providing its
    /// direct addresses or relay URLs will fail.
    ///
    /// See the documentation of the [`crate::address_lookup::AddressLookup`] trait for details.
    pub fn address_lookup(mut self, address_lookup: impl IntoAddressLookup) -> Self {
        self.address_lookup.push(Box::new(address_lookup));
        self
    }

    /// Sets the initial user-defined data to be published in Address Lookup's for this node.
    ///
    /// When using Address Lookup's, this string of [`UserData`] will be published together
    /// with the endpoint's addresses and relay URL. When other endpoints discover this endpoint,
    /// they retrieve the [`UserData`] in addition to the addressing info.
    ///
    /// Iroh itself does not interpret the user-defined data in any way, it is purely left
    /// for applications to parse and use.
    pub fn user_data_for_address_lookup(mut self, user_data: UserData) -> Self {
        self.address_lookup_user_data = Some(user_data);
        self
    }

    // # Methods for more specialist customisation.

    /// Sets a custom [`QuicTransportConfig`] for this endpoint.
    ///
    /// The transport config contains parameters governing the QUIC state machine.
    ///
    /// If unset, the default config is used. Default values should be suitable for most
    /// internet applications. Applications protocols which forbid remotely-initiated
    /// streams should set `max_concurrent_bidi_streams` and `max_concurrent_uni_streams` to
    /// zero.
    ///
    /// Please be aware that changing some settings may have adverse effects on establishing
    /// and maintaining direct connections.
    pub fn transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
        self.transport_config = transport_config;
        self
    }

    /// Optionally sets a custom DNS resolver to use for this endpoint.
    ///
    /// The DNS resolver is used to resolve relay hostnames, and endpoint addresses if
    /// [`crate::address_lookup::DnsAddressLookup`] is configured.
    ///
    /// By default, a new DNS resolver is created which is configured to use the
    /// host system's DNS configuration. You can pass a custom instance of [`DnsResolver`]
    /// here to use a differently configured DNS resolver for this endpoint, or to share
    /// a [`DnsResolver`] between multiple endpoints.
    #[cfg(not(wasm_browser))]
    pub fn dns_resolver(mut self, dns_resolver: DnsResolver) -> Self {
        self.dns_resolver = Some(dns_resolver);
        self
    }

    /// Sets an explicit proxy url to proxy all HTTP(S) traffic through.
    pub fn proxy_url(mut self, url: Url) -> Self {
        self.proxy_url.replace(url);
        self
    }

    /// Sets the proxy url from the environment, in this order:
    ///
    /// - `HTTP_PROXY`
    /// - `http_proxy`
    /// - `HTTPS_PROXY`
    /// - `https_proxy`
    pub fn proxy_from_env(mut self) -> Self {
        self.proxy_url = proxy_url_from_env();
        self
    }

    /// Enables saving the TLS pre-master key for connections.
    ///
    /// This key should normally remain secret but can be useful to debug networking issues
    /// by decrypting captured traffic.
    ///
    /// If *keylog* is `true` then setting the `SSLKEYLOGFILE` environment variable to a
    /// filename will result in this file being used to log the TLS pre-master keys.
    pub fn keylog(mut self, keylog: bool) -> Self {
        self.keylog = keylog;
        self
    }

    /// Skip verification of SSL certificates from relay servers
    ///
    /// May only be used in tests.
    #[cfg(any(test, feature = "test-utils"))]
    pub fn insecure_skip_relay_cert_verify(mut self, skip_verify: bool) -> Self {
        self.insecure_skip_relay_cert_verify = skip_verify;
        self
    }

    /// Set the maximum number of TLS tickets to cache.
    ///
    /// Set this to a larger value if you want to do 0rtt connections to a large
    /// number of clients.
    ///
    /// The default is 256, taking about 150 KiB in memory.
    pub fn max_tls_tickets(mut self, n: usize) -> Self {
        self.max_tls_tickets = n;
        self
    }

    /// Install hooks onto the endpoint.
    ///
    /// Endpoint hooks intercept the connection establishment process of an [`Endpoint`].
    ///
    /// You can install multiple [`EndpointHooks`] by calling this function multiple times.
    /// Order matters: hooks are invoked in the order they were installed onto the endpoint
    /// builder. Once a hook returns reject, further processing
    /// is aborted and other hooks won't be invoked.
    ///
    /// See [`EndpointHooks`] for details on the possible interception points in the connection lifecycle.
    pub fn hooks(mut self, hooks: impl EndpointHooks + 'static) -> Self {
        self.hooks.push(hooks);
        self
    }
}

#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
pub enum EndpointError {
    #[error("Endpoint is closed")]
    Closed,
}

/// Controls an iroh endpoint, establishing connections with other endpoints.
///
/// This is the main API interface to create connections to, and accept connections from
/// other iroh endpoints.  The connections are peer-to-peer and encrypted, a Relay server is
/// used to make the connections reliable.  See the [crate docs] for a more detailed
/// overview of iroh.
///
/// It is recommended to only create a single instance per application.  This ensures all
/// the connections made share the same peer-to-peer connections to other iroh endpoints,
/// while still remaining independent connections.  This will result in more optimal network
/// behaviour.
///
/// The endpoint is created using the [`Builder`], which can be created using
/// [`Endpoint::builder`].
///
/// Once an endpoint exists, new connections are typically created using the
/// [`Endpoint::connect`] and [`Endpoint::accept`] methods.  Once established, the
/// [`Connection`] gives access to most [QUIC] features.  Individual streams to send data to
/// the peer are created using the [`Connection::open_bi`], [`Connection::accept_bi`],
/// [`Connection::open_uni`] and [`Connection::open_bi`] functions.
///
/// Note that due to the light-weight properties of streams a stream will only be accepted
/// once the initiating peer has sent some data on it.
///
/// [QUIC]: https://quicwg.org
#[derive(Clone, Debug)]
pub struct Endpoint {
    inner: Arc<EndpointInner>,
}

#[allow(missing_docs)]
#[stack_error(derive, add_meta, from_sources)]
#[non_exhaustive]
#[allow(private_interfaces)]
pub enum ConnectWithOptsError {
    #[error("Connecting to ourself is not supported")]
    SelfConnect,
    #[error("No addressing information available")]
    NoAddress { source: AddressLookupError },
    #[error("Unable to connect to remote")]
    Quinn {
        #[error(std_err)]
        source: QuicConnectError,
    },
    #[error("Internal consistency error")]
    InternalConsistencyError {
        /// Private source type, cannot be created publicly.
        source: RemoteStateActorStoppedError,
    },
    #[error("Connection was rejected locally")]
    LocallyRejected,
    #[error("Endpoint is closed")]
    EndpointClosed,
}

#[allow(missing_docs)]
#[stack_error(derive, add_meta, from_sources)]
#[non_exhaustive]
pub enum ConnectError {
    #[error(transparent)]
    Connect { source: ConnectWithOptsError },
    #[error(transparent)]
    Connecting { source: ConnectingError },
    #[error(transparent)]
    Connection {
        #[error(std_err)]
        source: ConnectionError,
    },
}

impl Endpoint {
    // The ordering of public methods is reflected directly in the documentation.  This is
    // roughly ordered by what is most commonly needed by users, but grouped in similar
    // items.

    // # Methods relating to construction.

    /// Returns the builder for an [`Endpoint`], with a production configuration.
    ///
    /// This uses the [`presets::N0`] as the configuration.
    pub fn builder() -> Builder {
        Builder::new(presets::N0)
    }

    /// Returns the builder for an [`Endpoint`], with an empty configuration.
    ///
    /// See [`Builder::empty`] for details.
    pub fn empty_builder(relay_mode: RelayMode) -> Builder {
        Builder::empty(relay_mode)
    }

    /// Constructs a default [`Endpoint`] and binds it immediately.
    ///
    /// Uses the [`presets::N0`] as configuration.
    pub async fn bind() -> Result<Self, BindError> {
        Self::builder().bind().await
    }

    /// Sets the list of accepted ALPN protocols.
    ///
    /// This will only affect new incoming connections.
    /// Note that this *overrides* the current list of ALPNs.
    ///
    /// If the endpoint is closed, this method will log a warning and ignore
    /// the request to set new ALPNs.
    pub fn set_alpns(&self, alpns: Vec<Vec<u8>>) {
        if self.is_closed() {
            warn!("Attempting to set ALPNs for a closed endpoint. Ignoring.");
            return;
        }
        let server_config = self.inner.static_config.create_server_config(alpns);
        self.inner
            .quinn_endpoint()
            .set_server_config(Some(server_config));
    }

    /// Adds the provided configuration to the [`RelayMap`].
    ///
    /// Replacing and returning any existing configuration for [`RelayUrl`].
    ///
    /// Will also return `None` if the endpoint is closed.
    pub async fn insert_relay(
        &self,
        relay: RelayUrl,
        config: Arc<RelayConfig>,
    ) -> Option<Arc<RelayConfig>> {
        if self.is_closed() {
            return None;
        }
        self.inner.insert_relay(relay, config).await
    }

    /// Removes the configuration from the [`RelayMap`] for the provided [`RelayUrl`].
    ///
    /// Returns any existing configuration if it exists. Will also return `None` if the endpoint is closed.
    pub async fn remove_relay(&self, relay: &RelayUrl) -> Option<Arc<RelayConfig>> {
        if self.is_closed() {
            return None;
        }
        self.inner.remove_relay(relay).await
    }

    // # Methods for establishing connectivity.

    /// Connects to a remote [`Endpoint`].
    ///
    /// A value that can be converted into an [`EndpointAddr`] is required. This can be either an
    /// [`EndpointAddr`] or an [`EndpointId`].
    ///
    /// The [`EndpointAddr`] must contain the [`EndpointId`] to dial and may also contain a [`RelayUrl`]
    /// and direct addresses. If direct addresses are provided, they will be used to try and
    /// establish a direct connection without involving a relay server.
    ///
    /// If neither a [`RelayUrl`] or direct addresses are configured in the [`EndpointAddr`] it
    /// may still be possible a connection can be established.  This depends on which, if any,
    /// [`crate::address_lookup::AddressLookup`]s were configured using [`Builder::address_lookup`].  The Address Lookup
    /// service will also be used if the remote endpoint is not reachable on the provided direct
    /// addresses and there is no [`RelayUrl`].
    ///
    /// If addresses or relay servers are neither provided nor can be discovered, the
    /// connection attempt will fail with an error.
    ///
    /// The `alpn`, or application-level protocol identifier, is also required. The remote
    /// endpoint must support this `alpn`, otherwise the connection attempt will fail with
    /// an error.
    ///
    /// [`RelayUrl`]: crate::RelayUrl
    pub async fn connect(
        &self,
        endpoint_addr: impl Into<EndpointAddr>,
        alpn: &[u8],
    ) -> Result<Connection, ConnectError> {
        let endpoint_addr = endpoint_addr.into();
        let remote = endpoint_addr.id;
        let connecting = self
            .connect_with_opts(endpoint_addr, alpn, Default::default())
            .await?;
        let conn = connecting.await?;

        debug!(
            me = %self.id().fmt_short(),
            remote = %remote.fmt_short(),
            alpn = %String::from_utf8_lossy(alpn),
            "Connection established."
        );
        Ok(conn)
    }

    /// Starts a connection attempt with a remote [`Endpoint`].
    ///
    /// Like [`Endpoint::connect`] (see also its docs for general details), but allows for a more
    /// advanced connection setup with more customization in two aspects:
    /// 1. The returned future resolves to a [`Connecting`], which can be further processed into
    ///    a [`Connection`] by awaiting, or alternatively allows connecting with 0-RTT via
    ///    [`Connecting::into_0rtt`].
    ///    **Note:** Please read the documentation for `into_0rtt` carefully to assess
    ///    security concerns.
    /// 2. The [`QuicTransportConfig`] for the connection can be modified via the provided
    ///    [`ConnectOptions`].
    ///    **Note:** Please be aware that changing transport config settings may have adverse effects on
    ///    establishing and maintaining direct connections.  Carefully test settings you use and
    ///    consider this currently as still rather experimental.
    #[instrument(name = "connect", skip_all, fields(
        me = %self.id().fmt_short(),
        remote = tracing::field::Empty,
        alpn = String::from_utf8_lossy(alpn).to_string(),
    ))]
    pub async fn connect_with_opts(
        &self,
        endpoint_addr: impl Into<EndpointAddr>,
        alpn: &[u8],
        options: ConnectOptions,
    ) -> Result<Connecting, ConnectWithOptsError> {
        if self.is_closed() {
            return Err(e!(ConnectWithOptsError::EndpointClosed));
        }
        let endpoint_addr: EndpointAddr = endpoint_addr.into();
        if let BeforeConnectOutcome::Reject =
            self.inner.hooks.before_connect(&endpoint_addr, alpn).await
        {
            return Err(e!(ConnectWithOptsError::LocallyRejected));
        }
        let endpoint_id = endpoint_addr.id;

        tracing::Span::current().record("remote", tracing::field::display(endpoint_id.fmt_short()));

        // Connecting to ourselves is not supported.
        ensure!(endpoint_id != self.id(), ConnectWithOptsError::SelfConnect);

        trace!(
            dst_endpoint_id = %endpoint_id.fmt_short(),
            relay_url = ?endpoint_addr.relay_urls().next().cloned(),
            ip_addresses = ?endpoint_addr.ip_addrs().cloned().collect::<Vec<_>>(),
            "connecting",
        );

        let mapped_addr = self.inner.resolve_remote(endpoint_addr).await??;

        let transport_config = options
            .transport_config
            .map(|cfg| cfg.to_inner_arc())
            .unwrap_or(self.inner.static_config.transport_config.to_inner_arc());

        // Start connecting via quinn. This will time out after 10 seconds if no reachable
        // address is available.

        let client_config = {
            let mut alpn_protocols = vec![alpn.to_vec()];
            alpn_protocols.extend(options.additional_alpns);
            let quic_client_config = self
                .inner
                .static_config
                .tls_config
                .make_client_config(alpn_protocols, self.inner.static_config.keylog);
            let mut client_config = quinn::ClientConfig::new(Arc::new(quic_client_config));
            client_config.transport_config(transport_config.clone());
            client_config
        };

        let dest_addr = mapped_addr.private_socket_addr();
        let server_name = &tls::name::encode(endpoint_id);
        let connect =
            self.inner
                .quinn_endpoint()
                .connect_with(client_config, dest_addr, server_name)?;

        Ok(Connecting::new(connect, self.clone(), endpoint_id))
    }

    /// Accepts an incoming connection on the endpoint.
    ///
    /// Only connections with the ALPNs configured in [`Builder::alpns`] will be accepted.
    /// If multiple ALPNs have been configured the ALPN can be inspected before accepting
    /// the connection using [`Connecting::alpn`].
    ///
    /// The returned future will yield `None` if the endpoint is closed by calling
    /// [`Endpoint::close`].
    pub fn accept(&self) -> Accept<'_> {
        Accept {
            inner: self.inner.quinn_endpoint().accept(),
            ep: self.clone(),
        }
    }

    // # Getter methods for properties of this Endpoint itself.

    /// Returns the secret_key of this endpoint.
    pub fn secret_key(&self) -> &SecretKey {
        &self.inner.static_config.tls_config.secret_key
    }

    /// Returns the endpoint id of this endpoint.
    ///
    /// This ID is the unique addressing information of this endpoint and other peers must know
    /// it to be able to connect to this endpoint.
    pub fn id(&self) -> EndpointId {
        self.inner.static_config.tls_config.secret_key.public()
    }

    /// Returns the current [`EndpointAddr`].
    /// As long as the endpoint was able to bind to a network interface, some
    /// local addresses will be available.
    ///
    /// The state of other fields depends on the state of networking and connectivity.
    /// Use the [`Endpoint::online`] method to ensure that the endpoint is considered
    /// "online" (has contacted a relay server) before calling this method, if you want
    /// to ensure that the `EndpointAddr` will contain enough information to allow this endpoint
    /// to be dialable by a remote endpoint over the internet.
    ///
    /// You can use the [`Endpoint::watch_addr`] method to get updates when the `EndpointAddr`
    /// changes.
    pub fn addr(&self) -> EndpointAddr {
        self.watch_addr().get()
    }

    /// Returns a [`Watcher`] for the current [`EndpointAddr`] for this endpoint.
    ///
    /// The observed [`EndpointAddr`] will have the current [`RelayUrl`] and direct addresses.
    ///
    /// ```no_run
    /// # async fn wrapper() -> n0_error::Result<()> {
    /// use iroh::{Endpoint, Watcher};
    ///
    /// let endpoint = Endpoint::builder()
    ///     .alpns(vec![b"my-alpn".to_vec()])
    ///     .bind()
    ///     .await?;
    /// let endpoint_addr = endpoint.watch_addr().get();
    /// # let _ = endpoint_addr;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// The [`Endpoint::online`] method can be used as a convenience method to
    /// understand if the endpoint has ever been considered "online". But after
    /// that initial call to [`Endpoint::online`], to understand if your
    /// endpoint is no longer able to be connected to by endpoints outside
    /// of the private or local network, watch for changes in it's [`EndpointAddr`].
    /// If there are no `addrs`in the [`EndpointAddr`], you may not be dialable by other endpoints
    /// on the internet.
    ///
    /// The `EndpointAddr` will change as:
    /// - network conditions change
    /// - the endpoint connects to a relay server
    /// - the endpoint changes its preferred relay server
    /// - more addresses are discovered for this endpoint
    ///
    /// ## Closing behavior
    ///
    /// The returned watcher only becomes disconnected once the last clone of the [`Endpoint`]
    /// is dropped. Closing the endpoint does not disconnect the watcher. Thus, a stream created
    /// via [`Watcher::stream`] only terminates once the endpoint is fully dropped. To stop a task
    /// that loops over a watcher stream once the endpoint stops, combine with [`Self::closed`]:
    ///
    /// ```
    /// # use iroh::{Watcher, Endpoint};
    /// # use n0_future::StreamExt;
    /// # use tracing::info;
    /// # async fn wrapper() -> n0_error::Result<()> {
    /// let endpoint = Endpoint::bind().await?;
    /// // We want to watch address changes in a different task, and stop our task
    /// // once the endpoint stops.
    /// let mut addr_stream = endpoint.watch_addr().stream();
    /// let endpoint_closed = endpoint.closed();
    /// tokio::spawn(endpoint_closed.run_until(async move {
    ///     while let Some(addr) = addr_stream.next().await {
    ///         info!("our address changed: {addr:?}");
    ///     }
    ///     info!("endpoint closed");
    /// }));
    /// // Do fancy things, then close the endpoint.
    /// // Our task above will stop even if there are still clones of `Endpoint` alive somewhere.
    /// endpoint.close().await;
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`RelayUrl`]: crate::RelayUrl
    #[cfg(not(wasm_browser))]
    pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
        let watch_addrs = self.inner.ip_addrs();
        let watch_relay = self.inner.home_relay();
        let endpoint_id = self.id();

        watch_addrs.or(watch_relay).map(move |(addrs, relays)| {
            EndpointAddr::from_parts(
                endpoint_id,
                relays
                    .into_iter()
                    .map(TransportAddr::Relay)
                    .chain(addrs.into_iter().map(|x| TransportAddr::Ip(x.addr))),
            )
        })
    }

    /// Returns a [`Watcher`] for the current [`EndpointAddr`] for this endpoint.
    ///
    /// When compiled to Wasm, this function returns a watcher that initializes
    /// with an [`EndpointAddr`] that only contains a relay URL, but no direct addresses,
    /// as there are no APIs for directly using sockets in browsers.
    ///
    /// The returned watcher only becomes disconnected once the last clone of the [`Endpoint`]
    /// is dropped. Closing the endpoint does not disconnect the watcher. Thus, a stream created
    /// via [`Watcher::stream`] only terminates once the endpoint stops. If you want to stop a
    /// task once the endpoint stops combine with [`Self::closed`].
    #[cfg(wasm_browser)]
    pub fn watch_addr(&self) -> impl n0_watcher::Watcher<Value = EndpointAddr> + use<> {
        // In browsers, there will never be any direct addresses, so we wait
        // for the home relay instead. This makes the `EndpointAddr` have *some* way
        // of connecting to us.
        let watch_relay = self.inner.home_relay();
        let endpoint_id = self.id();
        watch_relay.map(move |mut relays| {
            EndpointAddr::from_parts(endpoint_id, relays.into_iter().map(TransportAddr::Relay))
        })
    }

    /// A convenience method that waits for the endpoint to be considered "online".
    ///
    /// This currently means at least one relay server was connected,
    /// and at least one local IP address is available.
    /// Even if no relays are configured, this will still wait for a relay connection.
    ///
    /// Once this has been resolved the first time, this will always immediately resolve.
    ///
    /// This has no timeout, so if that is needed, you need to wrap it in a
    /// timeout. We recommend using a timeout close to
    /// [`crate::NET_REPORT_TIMEOUT`]s, so you can be sure that at least one
    /// [`crate::NetReport`] has been attempted.
    ///
    /// To understand if the endpoint has gone back "offline",
    /// you must use the [`Endpoint::watch_addr`] method, to
    /// get information on the current relay and direct address information.
    ///
    /// In the common case where the endpoint's configured relay servers are
    /// only accessible via a wide area network (WAN) connection, this method
    /// will await indefinitely when the endpoint has no WAN connection. If you're
    /// writing an app that's designed to work without a WAN connection, defer
    /// any calls to `online` as long as possible, or avoid calling `online`
    /// entirely.
    ///
    /// The online method does not interact with [`crate::address_lookup::AddressLookup`]
    /// services, which means that any Address Lookup that relies on a WAN
    /// connection is independent of the endpoint's online status.
    ///
    /// # Examples
    ///
    /// ```no run
    /// use iroh::Endpoint;
    ///
    /// #[tokio::main]
    /// async fn main() {
    /// // After this await returns, the endpoint is bound to a local socket.
    /// // It can be dialed, but almost certainly hasn't finished picking a
    /// // relay.
    /// let endpoint = Endpoint::bind().await;
    ///
    /// // After this await returns we have a connection to at least one relay
    /// // and holepunching should work as expected.
    /// endpoint.online().await;
    /// }
    /// ```
    pub async fn online(&self) {
        self.inner.home_relay().initialized().await;
    }

    /// Returns a [`Watcher`] for any net-reports run from this [`Endpoint`].
    ///
    /// A `net-report` checks the network conditions of the [`Endpoint`], such as
    /// whether it is connected to the internet via Ipv4 and/or Ipv6, its NAT
    /// status, its latency to the relay servers, and its public addresses.
    ///
    /// The [`Endpoint`] continuously runs `net-reports` to monitor if network
    /// conditions have changed. This [`Watcher`] will return the latest result
    /// of the `net-report`.
    ///
    /// When issuing the first call to this method the first report might
    /// still be underway, in this case the [`Watcher`] might not be initialized
    /// with [`Some`] value yet.  Once the net-report has been successfully
    /// run, the [`Watcher`] will always return [`Some`] report immediately, which
    /// is the most recently run `net-report`.
    ///
    /// The returned watcher only becomes disconnected once the last clone of the [`Endpoint`]
    /// is dropped. Closing the endpoint does not disconnect the watcher. Thus, a stream created
    /// via [`Watcher::stream`] only terminates once the endpoint stops. If you want to stop a
    /// task once the endpoint stops combine with [`Self::closed`].
    ///
    /// # Examples
    ///
    /// To get the first report use [`Watcher::initialized`]:
    /// ```no_run
    /// use iroh::{Endpoint, Watcher as _};
    ///
    /// # let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
    /// # rt.block_on(async move {
    /// let ep = Endpoint::bind().await.unwrap();
    /// let _report = ep.net_report().initialized().await;
    /// # });
    /// ```
    #[doc(hidden)]
    pub fn net_report(&self) -> impl Watcher<Value = Option<NetReport>> + use<> {
        self.inner.net_report()
    }

    /// Returns the last [`NetReport`] generated by this endpoint.
    ///
    /// Returns `None` if no net report was ever generated.
    ///
    /// This method is hidden in the docs because it is not part of the public api
    #[doc(hidden)]
    pub fn last_net_report(&self) -> Option<NetReport> {
        self.inner.net_report().get()
    }

    /// Returns the local socket addresses on which the underlying sockets are bound.
    ///
    /// The [`Endpoint`] always binds on an IPv4 address and also tries to bind on an IPv6
    /// address if available.
    #[cfg(not(wasm_browser))]
    pub fn bound_sockets(&self) -> Vec<SocketAddr> {
        self.inner
            .local_addr()
            .into_iter()
            .filter_map(|addr| addr.into_socket_addr())
            .collect()
    }

    // # Methods for less common getters.
    //
    // Partially they return things passed into the builder.

    /// Returns the DNS resolver used in this [`Endpoint`].
    ///
    /// # Errors
    ///
    /// Returns an `EndpointError::Closed` error if the endpoint is closed.
    ///
    /// See [`Builder::dns_resolver`].
    #[cfg(not(wasm_browser))]
    pub fn dns_resolver(&self) -> Result<&DnsResolver, EndpointError> {
        if self.is_closed() {
            return Err(e!(EndpointError::Closed));
        }
        Ok(self.inner.dns_resolver())
    }

    /// Returns the Address Lookup service, if configured.
    ///
    /// # Errors
    ///
    /// Returns a `EndpointError::Closed` error if the endpoint is closed.
    ///
    /// See [`Builder::address_lookup`].
    pub fn address_lookup(&self) -> Result<&ConcurrentAddressLookup, EndpointError> {
        if self.is_closed() {
            return Err(e!(EndpointError::Closed));
        }
        Ok(self.inner.address_lookup())
    }

    /// Returns metrics collected for this endpoint.
    ///
    /// The endpoint internally collects various metrics about its operation.
    /// The returned [`EndpointMetrics`] struct contains all of these metrics.
    ///
    /// You can access individual metrics directly by using the public fields:
    /// ```rust
    /// # use std::collections::BTreeMap;
    /// # use iroh::endpoint::Endpoint;
    /// # async fn wrapper() -> n0_error::Result<()> {
    /// let endpoint = Endpoint::bind().await?;
    /// assert_eq!(endpoint.metrics().socket.recv_datagrams.get(), 0);
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`EndpointMetrics`] implements [`MetricsGroupSet`], and each field
    /// implements [`MetricsGroup`]. These traits provide methods to iterate over
    /// the groups in the set, and over the individual metrics in each group, without having
    /// to access each field manually. With these methods, it is straightforward to collect
    /// all metrics into a map or push their values to a metrics collector.
    ///
    /// For example, the following snippet collects all metrics into a map:
    /// ```rust
    /// # use std::collections::BTreeMap;
    /// # use iroh_metrics::{Metric, MetricsGroup, MetricValue, MetricsGroupSet};
    /// # use iroh::endpoint::Endpoint;
    /// # async fn wrapper() -> n0_error::Result<()> {
    /// let endpoint = Endpoint::bind().await?;
    /// let metrics: BTreeMap<String, MetricValue> = endpoint
    ///     .metrics()
    ///     .iter()
    ///     .map(|(group, metric)| {
    ///         let name = [group, metric.name()].join(":");
    ///         (name, metric.value())
    ///     })
    ///     .collect();
    ///
    /// assert_eq!(metrics["socket:recv_datagrams"], MetricValue::Counter(0));
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// The metrics can also be encoded into the OpenMetrics text format, as used by Prometheus.
    /// To do so, use the [`iroh_metrics::Registry`], add the endpoint metrics to the
    /// registry with [`Registry::register_all`], and encode the metrics to a string with
    /// [`encode_openmetrics_to_string`]:
    /// ```rust
    /// # use iroh_metrics::{Registry, MetricsSource};
    /// # use iroh::endpoint::Endpoint;
    /// # async fn wrapper() -> n0_error::Result<()> {
    /// let endpoint = Endpoint::bind().await?;
    /// let mut registry = Registry::default();
    /// registry.register_all(endpoint.metrics());
    /// let s = registry.encode_openmetrics_to_string()?;
    /// assert!(s.contains(r#"TYPE socket_recv_datagrams counter"#));
    /// assert!(s.contains(r#"socket_recv_datagrams_total 0"#));
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// Through a registry, you can also add labels or prefixes to metrics with
    /// [`Registry::sub_registry_with_label`] or [`Registry::sub_registry_with_prefix`].
    /// Furthermore, [`iroh_metrics::service`] provides functions to easily start services
    /// to serve the metrics with a HTTP server, dump them to a file, or push them
    /// to a Prometheus gateway.
    ///
    /// For example, the following snippet launches an HTTP server that serves the metrics in the
    /// OpenMetrics text format:
    /// ```no_run
    /// # use std::{sync::{Arc, RwLock}, time::Duration};
    /// # use iroh_metrics::{Registry, MetricsSource};
    /// # use iroh::endpoint::Endpoint;
    /// # use n0_error::{StackResultExt, StdResultExt};
    /// # async fn wrapper() -> n0_error::Result<()> {
    /// // Create a registry, wrapped in a read-write lock so that we can register and serve
    /// // the metrics independently.
    /// let registry = Arc::new(RwLock::new(Registry::default()));
    /// // Spawn a task to serve the metrics on an OpenMetrics HTTP endpoint.
    /// let metrics_task = tokio::task::spawn({
    ///     let registry = registry.clone();
    ///     async move {
    ///         let addr = "0.0.0.0:9100".parse().unwrap();
    ///         iroh_metrics::service::start_metrics_server(addr, registry).await
    ///     }
    /// });
    ///
    /// // Spawn an endpoint and add the metrics to the registry.
    /// let endpoint = Endpoint::bind().await?;
    /// registry.write().unwrap().register_all(endpoint.metrics());
    ///
    /// // Wait for the metrics server to bind, then fetch the metrics via HTTP.
    /// tokio::time::sleep(Duration::from_millis(500));
    /// let res = reqwest::get("http://localhost:9100/metrics")
    ///     .await
    ///     .std_context("get")?
    ///     .text()
    ///     .await
    ///     .std_context("text")?;
    ///
    /// assert!(res.contains(r#"TYPE socket_recv_datagrams counter"#));
    /// assert!(res.contains(r#"socket_recv_datagrams_total 0"#));
    /// # metrics_task.abort();
    /// # Ok(())
    /// # }
    /// ```
    ///
    /// [`Registry`]: iroh_metrics::Registry
    /// [`Registry::register_all`]: iroh_metrics::Registry::register_all
    /// [`Registry::sub_registry_with_label`]: iroh_metrics::Registry::sub_registry_with_label
    /// [`Registry::sub_registry_with_prefix`]: iroh_metrics::Registry::sub_registry_with_prefix
    /// [`encode_openmetrics_to_string`]: iroh_metrics::MetricsSource::encode_openmetrics_to_string
    /// [`MetricsGroup`]: iroh_metrics::MetricsGroup
    /// [`MetricsGroupSet`]: iroh_metrics::MetricsGroupSet
    #[cfg(feature = "metrics")]
    pub fn metrics(&self) -> &EndpointMetrics {
        &self.inner.metrics
    }

    /// Returns addressing information about a recently used remote endpoint.
    ///
    /// The returned [`RemoteInfo`] contains a list of all transport addresses for the remote
    /// that we know about. This is a snapshot in time and not a watcher.
    ///
    /// Returns `None` if the endpoint doesn't have information about the remote or if the endpoint is closed.
    /// When remote endpoints are no longer used, our endpoint will keep information around
    /// for a little while, and then drop it. Afterwards, this will return `None`.
    pub async fn remote_info(&self, endpoint_id: EndpointId) -> Option<RemoteInfo> {
        if self.is_closed() {
            return None;
        }
        self.inner.remote_info(endpoint_id).await
    }

    // # Methods for less common state updates.

    /// Notifies the system of potential network changes.
    ///
    /// On many systems iroh is able to detect network changes by itself, however
    /// some systems like android do not expose this functionality to native code.
    /// Android does however provide this functionality to Java code.  This
    /// function allows for notifying iroh of any potential network changes like
    /// this.
    ///
    /// Even when the network did not change, or iroh was already able to detect
    /// the network change itself, there is no harm in calling this function.
    ///
    /// If the endpoint is closed, this method will log a warning and ignore the request.
    pub async fn network_change(&self) {
        if self.is_closed() {
            debug!("Attempting to notify a closed endpoint about a network change. Ignoring.");
            return;
        }
        self.inner.network_change().await;
    }

    // # Methods to update internal state.

    /// Sets the initial user-defined data to be published in Address Lookups for this endpoint.
    ///
    /// If the user-defined data passed to this function is different to the previous one,
    /// the endpoint will republish its endpoint info to the configured Address Lookups.
    ///
    /// See also [`Builder::user_data_for_address_lookup`] for setting an initial value when
    /// building the endpoint.
    ///
    /// If the endpoint is closed, this method will log a warning and ignore the
    /// request.
    pub fn set_user_data_for_address_lookup(&self, user_data: Option<UserData>) {
        if self.is_closed() {
            warn!("Attempting to set user data for a closed endpoint. Ignoring.");
            return;
        }
        self.inner.set_user_data_for_address_lookup(user_data);
    }

    // # Methods for terminating the endpoint.

    /// Closes the QUIC endpoint and the socket.
    ///
    /// This will close any remaining open [`Connection`]s with an error code
    /// of `0` and an empty reason.  Though it is best practice to close those
    /// explicitly before with a custom error code and reason.
    ///
    /// It will then make a best effort to wait for all close notifications to be
    /// acknowledged by the peers, re-transmitting them if needed. This ensures the
    /// peers are aware of the closed connections instead of having to wait for a timeout
    /// on the connection. Once all connections are closed or timed out, the future
    /// finishes.
    ///
    /// The maximum time-out that this future will wait for depends on QUIC transport
    /// configurations of non-drained connections at the time of calling, and their current
    /// estimates of round trip time. With default parameters and a conservative estimate
    /// of round trip time, this call's future should take 3 seconds to resolve in cases of
    /// bad connectivity or failed connections. In the usual case, this call's future should
    /// return much more quickly.
    ///
    /// It is highly recommended you *do* wait for this close call to finish, if possible.
    /// Not doing so will make connections that were still open while closing the endpoint
    /// time out on the remote end. Thus remote ends will assume connections to have failed
    /// even if all application data was transmitted successfully.
    ///
    /// Note: Someone used to closing TCP sockets might wonder why it is necessary to wait
    /// for timeouts when closing QUIC endpoints, while they don't have to do this for TCP
    /// sockets. This is due to QUIC and its acknowledgments being implemented in user-land,
    /// while TCP sockets usually get closed and drained by the operating system in the
    /// kernel during the "Time-Wait" period of the TCP socket.
    ///
    /// Be aware however that the underlying UDP sockets are only closed once all clones of
    /// the the respective [`Endpoint`] are dropped.
    pub async fn close(&self) {
        self.inner.close().await;
    }

    /// Check if this endpoint is still alive, or already closed.
    pub fn is_closed(&self) -> bool {
        self.inner.is_closed()
    }

    /// Returns a future that resolves once the endpoint closes.
    ///
    /// The returned future does not contain a clone or reference to the [`Endpoint`],
    /// so keeping the returned future alive does not prevent the endpoint from being dropped.
    ///
    /// To run a task and stop it once the endpoint closes, you can use
    /// [`EndpointClosed::run_until`]:
    /// ```
    /// # use iroh::endpoint::Endpoint;
    /// # async fn wrapper() -> n0_error::Result<()> {
    /// let endpoint = Endpoint::bind().await?;
    /// tokio::spawn(endpoint.closed().run_until(async move {
    ///     // the future will be aborted once the endpoint closes.
    /// }));
    /// # Ok(())
    /// # }
    /// ```
    pub fn closed(&self) -> EndpointClosed {
        EndpointClosed {
            inner: self.inner.closed(),
        }
    }

    /// Create a [`ServerConfigBuilder`] for this endpoint that includes the given alpns.
    ///
    /// Use the [`ServerConfigBuilder`] to customize the [`ServerConfig`] connection configuration
    /// for a connection accepted using the [`Incoming::accept_with`] method.
    pub fn create_server_config_builder(&self, alpns: Vec<Vec<u8>>) -> ServerConfigBuilder {
        let inner = self.inner.static_config.create_server_config(alpns);
        ServerConfigBuilder::new(inner, self.inner.static_config.transport_config.clone())
    }

    // # Remaining private methods

    #[cfg(test)]
    pub(crate) fn inner(&self) -> Result<Arc<EndpointInner>, EndpointError> {
        if self.is_closed() {
            return Err(e!(EndpointError::Closed));
        }
        Ok(self.inner.clone())
    }
}

/// Options for the [`Endpoint::connect_with_opts`] function.
#[derive(Default, Debug, Clone)]
pub struct ConnectOptions {
    transport_config: Option<QuicTransportConfig>,
    additional_alpns: Vec<Vec<u8>>,
}

impl ConnectOptions {
    /// Initializes new connection options.
    ///
    /// By default, the connection will use the same options
    /// as [`Endpoint::connect`], e.g. a default [`QuicTransportConfig`].
    pub fn new() -> Self {
        Self::default()
    }

    /// Sets the QUIC transport config options for this connection.
    pub fn with_transport_config(mut self, transport_config: QuicTransportConfig) -> Self {
        self.transport_config = Some(transport_config);
        self
    }

    /// Sets [ALPN] identifiers that should be signaled as supported on connection, *in
    /// addition* to the main [ALPN] identifier used in [`Endpoint::connect_with_opts`].
    ///
    /// This allows connecting to servers that may only support older versions of your
    /// protocol. In this case, you would add the older [ALPN] identifiers with this
    /// function.
    ///
    /// You'll know the final negotiated [ALPN] identifier once your connection was
    /// established using [`Connection::alpn`], or even slightly earlier in the
    /// handshake by using [`Connecting::alpn`].
    /// The negotiated [ALPN] identifier may be any of the [ALPN] identifiers in this
    /// list or the main [ALPN] used in [`Endpoint::connect_with_opts`].
    ///
    /// The [ALPN] identifier order on the connect side doesn't matter, since it's the
    /// accept side that determines the protocol.
    ///
    /// For setting the supported [ALPN] identifiers on the accept side, see the endpoint
    /// builder's [`Builder::alpns`] function.
    ///
    /// [ALPN]: https://en.wikipedia.org/wiki/Application-Layer_Protocol_Negotiation
    pub fn with_additional_alpns(mut self, alpns: Vec<Vec<u8>>) -> Self {
        self.additional_alpns = alpns;
        self
    }
}

/// Future returned from [`Endpoint::closed`].
#[derive(derive_more::Debug)]
#[pin_project]
#[debug("EndpointClosed")]
pub struct EndpointClosed {
    #[pin]
    inner: WaitForCancellationFutureOwned,
}

impl Future for EndpointClosed {
    type Output = ();

    fn poll(
        self: Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        let this = self.project();
        this.inner.poll(cx)
    }
}

impl EndpointClosed {
    /// Runs a future to completion, or until the [`Endpoint`] is closed.
    ///
    /// Returns the output of `fut` if it completes before the endpoint closes,
    /// or `None` otherwise.
    pub async fn run_until<F: Future>(self, fut: F) -> Option<F::Output> {
        n0_future::future::or(async { Some(fut.await) }, async {
            self.await;
            None
        })
        .await
    }
}

/// Read a proxy url from the environment, in this order
///
/// - `HTTP_PROXY`
/// - `http_proxy`
/// - `HTTPS_PROXY`
/// - `https_proxy`
fn proxy_url_from_env() -> Option<Url> {
    if let Some(url) = std::env::var("HTTP_PROXY")
        .ok()
        .and_then(|s| s.parse::<Url>().ok())
    {
        if is_cgi() {
            warn!("HTTP_PROXY environment variable ignored in CGI");
        } else {
            return Some(url);
        }
    }
    if let Some(url) = std::env::var("http_proxy")
        .ok()
        .and_then(|s| s.parse::<Url>().ok())
    {
        return Some(url);
    }
    if let Some(url) = std::env::var("HTTPS_PROXY")
        .ok()
        .and_then(|s| s.parse::<Url>().ok())
    {
        return Some(url);
    }
    if let Some(url) = std::env::var("https_proxy")
        .ok()
        .and_then(|s| s.parse::<Url>().ok())
    {
        return Some(url);
    }

    None
}

/// Configuration of the relay servers for an [`Endpoint`].
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RelayMode {
    /// Disable relay servers completely.
    /// This means that neither listening nor dialing relays will be available.
    Disabled,
    /// Use the default relay map, with production relay servers from n0.
    ///
    /// See [`crate::defaults::prod`] for the severs used.
    Default,
    /// Use the staging relay servers from n0.
    Staging,
    /// Use a custom relay map.
    Custom(RelayMap),
}

impl RelayMode {
    /// Returns the relay map for this mode.
    pub fn relay_map(&self) -> RelayMap {
        match self {
            RelayMode::Disabled => RelayMap::empty(),
            RelayMode::Default => crate::defaults::prod::default_relay_map(),
            RelayMode::Staging => crate::defaults::staging::default_relay_map(),
            RelayMode::Custom(relay_map) => relay_map.clone(),
        }
    }

    /// Create a custom relay mode from a list of [`RelayUrl`]s.
    ///
    /// # Example
    ///
    /// ```
    /// # fn main() -> n0_error::Result<()> {
    /// # use iroh::RelayMode;
    /// RelayMode::custom([
    ///     "https://use1-1.relay.n0.iroh-canary.iroh.link.".parse()?,
    ///     "https://euw-1.relay.n0.iroh-canary.iroh.link.".parse()?,
    /// ]);
    /// # Ok(()) }
    /// ```
    pub fn custom(map: impl IntoIterator<Item = RelayUrl>) -> Self {
        let m = RelayMap::from_iter(map);
        Self::Custom(m)
    }
}

/// Environment variable to force the use of staging relays.
pub const ENV_FORCE_STAGING_RELAYS: &str = "IROH_FORCE_STAGING_RELAYS";

/// Returns `true` if the use of staging relays is forced.
pub fn force_staging_infra() -> bool {
    matches!(std::env::var(ENV_FORCE_STAGING_RELAYS), Ok(value) if !value.is_empty())
}

/// Returns the default relay mode.
///
/// If the `IROH_FORCE_STAGING_RELAYS` environment variable is non empty, it will return `RelayMode::Staging`.
/// Otherwise, it will return `RelayMode::Default`.
pub fn default_relay_mode() -> RelayMode {
    // Use staging in testing
    match force_staging_infra() {
        true => RelayMode::Staging,
        false => RelayMode::Default,
    }
}

/// Check if we are being executed in a CGI context.
///
/// If so, a malicious client can send the `Proxy:` header, and it will
/// be in the `HTTP_PROXY` env var. So we don't use it :)
fn is_cgi() -> bool {
    std::env::var_os("REQUEST_METHOD").is_some()
}

#[cfg(test)]
mod tests {
    use std::{
        collections::BTreeMap,
        io,
        net::{IpAddr, Ipv4Addr, Ipv6Addr},
        str::FromStr,
        sync::Arc,
        time::{Duration, Instant},
    };

    use iroh_base::{EndpointAddr, EndpointId, RelayUrl, SecretKey, TransportAddr};
    use iroh_relay::endpoint_info::UserData;
    use n0_error::{AnyError as Error, Result, StdResultExt};
    use n0_future::{BufferedStreamExt, StreamExt, future::now_or_never, stream, time};
    use n0_tracing_test::traced_test;
    use n0_watcher::Watcher;
    use quinn::PathStats;
    use rand::SeedableRng;
    use rand_chacha::ChaCha8Rng;
    use tokio::sync::oneshot;
    use tracing::{Instrument, debug_span, error_span, info, info_span, instrument};

    use super::Endpoint;
    use crate::{
        RelayMap, RelayMode,
        address_lookup::memory::MemoryLookup,
        endpoint::{
            ApplicationClose, BindError, BindOpts, ConnectError, ConnectOptions,
            ConnectWithOptsError, Connection, ConnectionError, PathWatcher,
        },
        protocol::{AcceptError, ProtocolHandler, Router},
        test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with},
    };

    const TEST_ALPN: &[u8] = b"n0/iroh/test";

    #[tokio::test]
    #[traced_test]
    async fn test_connect_self() -> Result {
        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await
            .unwrap();
        let my_addr = ep.addr();
        let res = ep.connect(my_addr.clone(), TEST_ALPN).await;
        assert!(res.is_err());
        let err = res.err().unwrap();
        assert!(err.to_string().starts_with("Connecting to ourself"));

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_connect_close() -> Result {
        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
        let (relay_map, relay_url, _guard) = run_relay_server().await?;
        let server_secret_key = SecretKey::generate(&mut rng);
        let server_peer_id = server_secret_key.public();

        let qlog = QlogFileGroup::from_env("endpoint_connect_close");

        // Wait for the endpoint to be started to make sure it's up before clients try to connect
        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
            .secret_key(server_secret_key)
            .transport_config(qlog.create("server")?)
            .alpns(vec![TEST_ALPN.to_vec()])
            .insecure_skip_relay_cert_verify(true)
            .bind()
            .await?;
        // Wait for the endpoint to be reachable via relay
        ep.online().await;

        let server = tokio::spawn(
            async move {
                info!("accepting connection");
                let incoming = ep.accept().await.anyerr()?;
                let conn = incoming.await.anyerr()?;
                let mut stream = conn.accept_uni().await.anyerr()?;
                let mut buf = [0u8; 5];
                stream.read_exact(&mut buf).await.anyerr()?;
                info!("Accepted 1 stream, received {buf:?}.  Closing now.");
                // close the connection
                conn.close(7u8.into(), b"bye");

                let res = conn.accept_uni().await;
                assert_eq!(res.unwrap_err(), ConnectionError::LocallyClosed);

                let res = stream.read_to_end(10).await;
                assert_eq!(
                    res.unwrap_err(),
                    quinn::ReadToEndError::Read(quinn::ReadError::ConnectionLost(
                        ConnectionError::LocallyClosed
                    ))
                );
                info!("Closing the endpoint");
                ep.close().await;
                info!("server test completed");
                Ok::<_, Error>(())
            }
            .instrument(info_span!("test-server")),
        );

        let client = tokio::spawn(
            async move {
                let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
                    .alpns(vec![TEST_ALPN.to_vec()])
                    .insecure_skip_relay_cert_verify(true)
                    .transport_config(qlog.create("client")?)
                    .bind()
                    .await?;
                info!("client connecting");
                let endpoint_addr = EndpointAddr::new(server_peer_id).with_relay_url(relay_url);
                let conn = ep.connect(endpoint_addr, TEST_ALPN).await?;
                let mut stream = conn.open_uni().await.anyerr()?;

                // First write is accepted by server.  We need this bit of synchronisation
                // because if the server closes after simply accepting the connection we can
                // not be sure our .open_uni() call would succeed as it may already receive
                // the error.
                stream.write_all(b"hello").await.anyerr()?;

                info!("waiting for closed");
                // Remote now closes the connection, we should see an error sometime soon.
                let err = conn.closed().await;
                let expected_err = ConnectionError::ApplicationClosed(ApplicationClose {
                    error_code: 7u8.into(),
                    reason: b"bye".to_vec().into(),
                });
                assert_eq!(err, expected_err);

                info!("opening new - expect it to fail");
                let res = conn.open_uni().await;
                assert_eq!(res.unwrap_err(), expected_err);
                info!("Closing the client");
                ep.close().await;
                info!("client test completed");
                Ok::<_, Error>(())
            }
            .instrument(info_span!("test-client")),
        );

        let (server, client) = tokio::time::timeout(
            Duration::from_secs(30),
            n0_future::future::zip(server, client),
        )
        .await
        .anyerr()?;
        server.anyerr()??;
        client.anyerr()??;
        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_relay_connect_loop() -> Result {
        let test_start = Instant::now();
        let n_clients = 5;
        let n_chunks_per_client = 2;
        let chunk_size = 100;
        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
        let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
        let server_secret_key = SecretKey::generate(&mut rng);
        let server_endpoint_id = server_secret_key.public();

        // Make sure the server is bound before having clients connect to it:
        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true)
            .secret_key(server_secret_key)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;
        // Also make sure the server has a working relay connection
        ep.online().await;

        info!(time = ?test_start.elapsed(), "test setup done");

        // The server accepts the connections of the clients sequentially.
        let server = tokio::spawn(
            async move {
                let eps = ep.bound_sockets();

                info!(me = %ep.id().fmt_short(), eps = ?eps, "server listening on");
                for i in 0..n_clients {
                    let res = tokio::time::timeout(Duration::from_secs(5), async {
                        let round_start = Instant::now();
                        info!("[server] round {i}");
                        let incoming = ep.accept().await.anyerr()?;
                        let conn = incoming.await.anyerr()?;
                        let endpoint_id = conn.remote_id();
                        info!(%i, peer = %endpoint_id.fmt_short(), "accepted connection");
                        let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
                        let mut buf = vec![0u8; chunk_size];
                        for _i in 0..n_chunks_per_client {
                            recv.read_exact(&mut buf).await.anyerr()?;
                            send.write_all(&buf).await.anyerr()?;
                        }
                        info!(%i, peer = %endpoint_id.fmt_short(), "finishing");
                        send.finish().anyerr()?;
                        conn.closed().await; // we're the last to send data, so we wait for the other side to close
                        info!(%i, peer = %endpoint_id.fmt_short(), "finished");
                        info!("[server] round {i} done in {:?}", round_start.elapsed());
                        Ok::<_, Error>(())
                    })
                    .await
                    .std_context("timeout");
                    match res {
                        Err(err) | Ok(Err(err)) => {
                            // ensure we close the endpoint before returning early
                            // on error
                            ep.close().await;
                            return Err(err);
                        }
                        _ => {
                            // if this round went `Ok` don't close the endpoint yet
                        }
                    }
                }
                // close the endpoint before dropping the server task
                ep.close().await;
                Ok::<_, Error>(())
            }
            .instrument(debug_span!("server")),
        );

        let client = tokio::spawn(async move {
            for i in 0..n_clients {
                let round_start = Instant::now();
                info!("[client] round {i}");
                let client_secret_key = SecretKey::generate(&mut rng);
                let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
                    .alpns(vec![TEST_ALPN.to_vec()])
                    .insecure_skip_relay_cert_verify(true)
                    .secret_key(client_secret_key)
                    .bind()
                    .await?;
                let ep_1 = ep.clone();
                let res = tokio::time::timeout(
                    Duration::from_secs(5),
                    async {
                        info!("client binding");
                        let eps = ep.bound_sockets();

                        info!(me = %ep.id().fmt_short(), eps=?eps, "client bound");
                        let endpoint_addr =
                            EndpointAddr::new(server_endpoint_id).with_relay_url(relay_url.clone());
                        info!(to = ?endpoint_addr, "client connecting");
                        let conn = ep.connect(endpoint_addr, TEST_ALPN).await.anyerr()?;
                        info!("client connected");
                        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;

                        for i in 0..n_chunks_per_client {
                            let mut buf = vec![i; chunk_size];
                            send.write_all(&buf).await.anyerr()?;
                            recv.read_exact(&mut buf).await.anyerr()?;
                            assert_eq!(buf, vec![i; chunk_size]);
                        }
                        // we're the last to receive data, so we close
                        conn.close(0u32.into(), b"bye!");
                        info!("client finished");
                        Ok::<_, Error>(())
                    }
                    .instrument(debug_span!("client", %i)),
                )
                .await
                .std_context("timeout");
                ep_1.close().await;
                info!("client endpoint closed");
                res??;
                info!("[client] round {i} done in {:?}", round_start.elapsed());
            }
            Ok::<_, Error>(())
        });

        server.await.anyerr()??;
        client.await.anyerr()??;
        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_send_relay() -> Result {
        let (relay_map, _relay_url, _guard) = run_relay_server().await?;
        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true)
            .bind()
            .await?;
        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
            .insecure_skip_relay_cert_verify(true)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;

        let task = tokio::spawn({
            let server = server.clone();
            async move {
                let Some(conn) = server.accept().await else {
                    n0_error::bail_any!("Expected an incoming connection");
                };
                let conn = conn.await.anyerr()?;
                let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
                let data = recv.read_to_end(1000).await.anyerr()?;
                send.write_all(&data).await.anyerr()?;
                send.finish().anyerr()?;
                conn.closed().await;

                Ok::<_, Error>(())
            }
        });

        let addr = server.addr();
        let conn = client.connect(addr, TEST_ALPN).await?;
        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
        send.write_all(b"Hello, world!").await.anyerr()?;
        send.finish().anyerr()?;
        let data = recv.read_to_end(1000).await.anyerr()?;
        conn.close(0u32.into(), b"bye!");

        task.await.anyerr()??;

        client.close().await;
        server.close().await;

        assert_eq!(&data, b"Hello, world!");

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_two_direct_only() -> Result {
        // Connect two endpoints on the same network, without a relay server, without
        // Address Lookup.
        let ep1 = {
            let span = info_span!("server");
            let _guard = span.enter();
            Endpoint::builder()
                .alpns(vec![TEST_ALPN.to_vec()])
                .relay_mode(RelayMode::Disabled)
                .bind()
                .await?
        };
        let ep2 = {
            let span = info_span!("client");
            let _guard = span.enter();
            Endpoint::builder()
                .alpns(vec![TEST_ALPN.to_vec()])
                .relay_mode(RelayMode::Disabled)
                .bind()
                .await?
        };
        let ep1_nodeaddr = ep1.addr();

        #[instrument(name = "client", skip_all)]
        async fn connect(ep: Endpoint, dst: EndpointAddr) -> Result<ConnectionError> {
            info!(me = %ep.id().fmt_short(), "client starting");
            let conn = ep.connect(dst, TEST_ALPN).await?;
            let mut send = conn.open_uni().await.anyerr()?;
            send.write_all(b"hello").await.anyerr()?;
            send.finish().anyerr()?;
            Ok(conn.closed().await)
        }

        #[instrument(name = "server", skip_all)]
        async fn accept(ep: Endpoint, src: EndpointId) -> Result {
            info!(me = %ep.id().fmt_short(), "server starting");
            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
            let node_id = conn.remote_id();
            assert_eq!(node_id, src);
            let mut recv = conn.accept_uni().await.anyerr()?;
            let msg = recv.read_to_end(100).await.anyerr()?;
            assert_eq!(msg, b"hello");
            // Dropping the connection closes it just fine.
            Ok(())
        }

        let ep1_accept = tokio::spawn(accept(ep1.clone(), ep2.id()));
        let ep2_connect = tokio::spawn(connect(ep2.clone(), ep1_nodeaddr));

        ep1_accept.await.anyerr()??;
        let conn_closed = dbg!(ep2_connect.await.anyerr()??);
        assert!(matches!(
            conn_closed,
            ConnectionError::ApplicationClosed(ApplicationClose { .. })
        ));

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_two_relay_only_becomes_direct() -> Result {
        // Connect two endpoints on the same network, via a relay server, without
        // Address Lookup.  Wait until there is a direct connection.
        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
        let (node_addr_tx, node_addr_rx) = oneshot::channel();
        let qlog = Arc::new(QlogFileGroup::from_env("two_relay_only_becomes_direct"));

        #[instrument(name = "client", skip_all)]
        async fn connect(
            relay_map: RelayMap,
            node_addr_rx: oneshot::Receiver<EndpointAddr>,
            qlog: Arc<QlogFileGroup>,
        ) -> Result<ConnectionError> {
            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
            let secret = SecretKey::generate(&mut rng);
            let ep = Endpoint::builder()
                .secret_key(secret)
                .alpns(vec![TEST_ALPN.to_vec()])
                .insecure_skip_relay_cert_verify(true)
                .relay_mode(RelayMode::Custom(relay_map))
                .transport_config(qlog.create("client")?)
                .bind()
                .await?;
            info!(me = %ep.id().fmt_short(), "client starting");
            let dst = node_addr_rx.await.anyerr()?;

            info!(me = %ep.id().fmt_short(), "client connecting");
            let conn = ep.connect(dst, TEST_ALPN).await?;
            let mut send = conn.open_uni().await.anyerr()?;
            send.write_all(b"hello").await.anyerr()?;
            let mut paths = conn.paths().stream();
            info!("Waiting for direct connection");
            while let Some(infos) = paths.next().await {
                info!(?infos, "new PathInfos");
                if infos.iter().any(|info| info.is_ip()) {
                    break;
                }
            }
            info!("Have direct connection");
            // Validate holepunch metrics.
            assert_eq!(ep.metrics().socket.num_conns_opened.get(), 1);
            assert_eq!(ep.metrics().socket.num_conns_direct.get(), 1);

            send.write_all(b"close please").await.anyerr()?;
            send.finish().anyerr()?;

            let res = conn.closed().await;
            ep.close().await;
            Ok(res)
        }

        #[instrument(name = "server", skip_all)]
        async fn accept(
            relay_map: RelayMap,
            node_addr_tx: oneshot::Sender<EndpointAddr>,
            qlog: Arc<QlogFileGroup>,
        ) -> Result {
            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
            let secret = SecretKey::generate(&mut rng);
            let ep = Endpoint::builder()
                .secret_key(secret)
                .alpns(vec![TEST_ALPN.to_vec()])
                .insecure_skip_relay_cert_verify(true)
                .transport_config(qlog.create("server")?)
                .relay_mode(RelayMode::Custom(relay_map))
                .bind()
                .await?;
            ep.online().await;
            let mut node_addr = ep.addr();
            node_addr.addrs.retain(|addr| addr.is_relay());
            node_addr_tx.send(node_addr).unwrap();

            info!(me = %ep.id().fmt_short(), "server starting");
            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
            // let node_id = conn.remote_node_id()?;
            // assert_eq!(node_id, src);
            let mut recv = conn.accept_uni().await.anyerr()?;
            let mut msg = [0u8; 5];
            recv.read_exact(&mut msg).await.anyerr()?;
            assert_eq!(&msg, b"hello");
            info!("received hello");
            let msg = recv.read_to_end(100).await.anyerr()?;
            assert_eq!(msg, b"close please");
            info!("received 'close please'");
            // Closing the endpoint closes all connections.
            ep.close().await;
            Ok(())
        }

        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx, qlog.clone()));
        let client_task = tokio::spawn(connect(relay_map, node_addr_rx, qlog));

        server_task.await.anyerr()??;
        let conn_closed = dbg!(client_task.await.anyerr()??);
        assert!(matches!(
            conn_closed,
            ConnectionError::ApplicationClosed(ApplicationClose { .. })
        ));

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_two_relay_only_no_ip() -> Result {
        // Connect two endpoints on the same network, via a relay server, without
        // Address Lookup.
        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
        let (node_addr_tx, node_addr_rx) = oneshot::channel();

        #[instrument(name = "client", skip_all)]
        async fn connect(
            relay_map: RelayMap,
            node_addr_rx: oneshot::Receiver<EndpointAddr>,
        ) -> Result<ConnectionError> {
            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
            let secret = SecretKey::generate(&mut rng);
            let ep = Endpoint::builder()
                .secret_key(secret)
                .alpns(vec![TEST_ALPN.to_vec()])
                .insecure_skip_relay_cert_verify(true)
                .relay_mode(RelayMode::Custom(relay_map))
                .clear_ip_transports() // disable direct
                .bind()
                .await?;
            info!(me = %ep.id().fmt_short(), "client starting");
            let dst = node_addr_rx.await.anyerr()?;

            info!(me = %ep.id().fmt_short(), "client connecting");
            let conn = ep.connect(dst, TEST_ALPN).await?;
            let mut send = conn.open_uni().await.anyerr()?;
            send.write_all(b"hello").await.anyerr()?;
            let mut paths = conn.paths().stream();
            info!("Waiting for connection");
            'outer: while let Some(infos) = paths.next().await {
                info!(?infos, "new PathInfos");
                for info in infos.iter() {
                    if info.is_ip() {
                        panic!("should not happen: {:?}", info);
                    }
                    if info.is_relay() {
                        break 'outer;
                    }
                }
            }
            info!("Have relay connection");
            send.write_all(b"close please").await.anyerr()?;
            send.finish().anyerr()?;
            let res = conn.closed().await;
            ep.close().await;
            Ok(res)
        }

        #[instrument(name = "server", skip_all)]
        async fn accept(
            relay_map: RelayMap,
            node_addr_tx: oneshot::Sender<EndpointAddr>,
        ) -> Result {
            let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
            let secret = SecretKey::generate(&mut rng);
            let ep = Endpoint::builder()
                .secret_key(secret)
                .alpns(vec![TEST_ALPN.to_vec()])
                .insecure_skip_relay_cert_verify(true)
                .relay_mode(RelayMode::Custom(relay_map))
                .clear_ip_transports()
                .bind()
                .await?;
            ep.online().await;
            let node_addr = ep.addr();
            node_addr_tx.send(node_addr).unwrap();

            info!(me = %ep.id().fmt_short(), "server starting");
            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
            // let node_id = conn.remote_node_id()?;
            // assert_eq!(node_id, src);
            let mut recv = conn.accept_uni().await.anyerr()?;
            let mut msg = [0u8; 5];
            recv.read_exact(&mut msg).await.anyerr()?;
            assert_eq!(&msg, b"hello");
            info!("received hello");
            let msg = recv.read_to_end(100).await.anyerr()?;
            assert_eq!(msg, b"close please");
            info!("received 'close please'");
            // Closing the endpoint closes all connections.
            ep.close().await;
            Ok(())
        }

        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
        let client_task = tokio::spawn(connect(relay_map, node_addr_rx));

        server_task.await.anyerr()??;
        let conn_closed = dbg!(client_task.await.anyerr()??);
        assert!(matches!(
            conn_closed,
            ConnectionError::ApplicationClosed(ApplicationClose { .. })
        ));

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_two_direct_add_relay() -> Result {
        // Connect two endpoints on the same network, without relay server and without
        // Address Lookup.  Add a relay connection later.
        let (relay_map, _relay_url, _relay_server_guard) = run_relay_server().await?;
        let (node_addr_tx, node_addr_rx) = oneshot::channel();

        #[instrument(name = "client", skip_all)]
        async fn connect(
            relay_map: RelayMap,
            node_addr_rx: oneshot::Receiver<EndpointAddr>,
        ) -> Result<()> {
            let secret = SecretKey::from([0u8; 32]);
            let ep = Endpoint::builder()
                .secret_key(secret)
                .alpns(vec![TEST_ALPN.to_vec()])
                .insecure_skip_relay_cert_verify(true)
                .relay_mode(RelayMode::Custom(relay_map))
                .bind()
                .await?;
            info!(me = %ep.id().fmt_short(), "client starting");
            let dst = node_addr_rx.await.anyerr()?;

            info!(me = %ep.id().fmt_short(), "client connecting");
            let conn = ep.connect(dst, TEST_ALPN).await?;
            info!(me = %ep.id().fmt_short(), "client connected");

            // We should be connected via IP, because it is faster than the relay server.
            // TODO: Maybe not panic if this is not true?
            let path_info = conn.paths().get();
            assert_eq!(path_info.len(), 1);
            assert!(path_info.iter().next().unwrap().is_ip());

            let mut paths = conn.paths().stream();
            time::timeout(Duration::from_secs(5), async move {
                while let Some(infos) = paths.next().await {
                    info!(?infos, "new PathInfos");
                    if infos.iter().any(|info| info.is_relay()) {
                        info!("client has a relay path");
                        break;
                    }
                }
            })
            .await
            .anyerr()?;

            // wait for the server to signal it has the relay connection
            let mut stream = conn.accept_uni().await.anyerr()?;
            stream.read_to_end(100).await.anyerr()?;

            info!("client closing");
            conn.close(0u8.into(), b"");
            ep.close().await;
            Ok(())
        }

        #[instrument(name = "server", skip_all)]
        async fn accept(
            relay_map: RelayMap,
            node_addr_tx: oneshot::Sender<EndpointAddr>,
        ) -> Result<ConnectionError> {
            let secret = SecretKey::from([1u8; 32]);
            let ep = Endpoint::builder()
                .secret_key(secret)
                .alpns(vec![TEST_ALPN.to_vec()])
                .insecure_skip_relay_cert_verify(true)
                .relay_mode(RelayMode::Custom(relay_map))
                .bind()
                .await?;
            ep.online().await;
            let node_addr = ep.addr();
            node_addr_tx.send(node_addr).unwrap();

            info!(me = %ep.id().fmt_short(), "server starting");
            let conn = ep.accept().await.anyerr()?.await.anyerr()?;
            info!(me = %ep.id().fmt_short(), "server accepted connection");

            // Wait for a relay connection to be added.  Client does all the asserting here,
            // we just want to wait so we get to see all the mechanics of the connection
            // being added on this side too.
            let mut paths = conn.paths().stream();
            time::timeout(Duration::from_secs(5), async move {
                while let Some(infos) = paths.next().await {
                    info!(?infos, "new PathInfos");
                    if infos.iter().any(|path| path.is_relay()) {
                        info!("server has a relay path");
                        break;
                    }
                }
            })
            .await
            .anyerr()?;

            let mut stream = conn.open_uni().await.anyerr()?;
            stream.write_all(b"have relay").await.anyerr()?;
            stream.finish().anyerr()?;
            info!("waiting conn.closed()");

            Ok(conn.closed().await)
        }

        let server_task = tokio::spawn(accept(relay_map.clone(), node_addr_tx));
        let client_task = tokio::spawn(connect(relay_map, node_addr_rx));

        client_task.await.anyerr()??;
        let conn_closed = dbg!(server_task.await.anyerr()??);
        assert!(matches!(
            conn_closed,
            ConnectionError::ApplicationClosed(ApplicationClose { .. })
        ));

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_relay_map_change() -> Result {
        let (relay_map, relay_url, _guard1) = run_relay_server().await?;
        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true)
            .bind()
            .await?;
        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
            .insecure_skip_relay_cert_verify(true)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;

        let task = tokio::spawn({
            let server = server.clone();
            async move {
                for i in 0..2 {
                    println!("accept: round {i}");
                    let Some(conn) = server.accept().await else {
                        n0_error::bail_any!("Expected an incoming connection");
                    };
                    let conn = conn.await.anyerr()?;
                    let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
                    let data = recv.read_to_end(1000).await.anyerr()?;
                    send.write_all(&data).await.anyerr()?;
                    send.finish().anyerr()?;
                    conn.closed().await;
                }
                Ok::<_, Error>(())
            }
        });

        server.online().await;

        let mut addr = server.addr();
        println!("round1: {:?}", addr);

        // remove direct addrs to force relay usage
        addr.addrs
            .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));

        let conn = client.connect(addr, TEST_ALPN).await?;
        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
        send.write_all(b"Hello, world!").await.anyerr()?;
        send.finish().anyerr()?;
        let data = recv.read_to_end(1000).await.anyerr()?;
        conn.close(0u32.into(), b"bye!");

        assert_eq!(&data, b"Hello, world!");

        // setup a second relay server
        let (new_relay_map, new_relay_url, _guard2) = run_relay_server().await?;
        let new_endpoint = new_relay_map
            .get(&new_relay_url)
            .expect("missing endpoint")
            .clone();
        dbg!(&new_relay_map);

        let addr_watcher = server.watch_addr();

        // add new new relay
        assert!(
            server
                .insert_relay(new_relay_url.clone(), new_endpoint.clone())
                .await
                .is_none()
        );
        // remove the old relay
        assert!(server.remove_relay(&relay_url).await.is_some());

        println!("------- changed ----- ");

        let mut addr = tokio::time::timeout(Duration::from_secs(10), async move {
            let mut stream = addr_watcher.stream();
            while let Some(addr) = stream.next().await {
                if addr.relay_urls().next() != Some(&relay_url) {
                    return addr;
                }
            }
            panic!("failed to change relay");
        })
        .await
        .anyerr()?;

        println!("round2: {:?}", addr);
        assert_eq!(addr.relay_urls().next(), Some(&new_relay_url));

        // remove direct addrs to force relay usage
        addr.addrs
            .retain(|addr| !matches!(addr, TransportAddr::Ip(_)));

        let conn = client.connect(addr, TEST_ALPN).await?;
        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
        send.write_all(b"Hello, world!").await.anyerr()?;
        send.finish().anyerr()?;
        let data = recv.read_to_end(1000).await.anyerr()?;
        conn.close(0u32.into(), b"bye!");

        task.await.anyerr()??;

        client.close().await;
        server.close().await;

        assert_eq!(&data, b"Hello, world!");

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn endpoint_bidi_send_recv() -> Result {
        let disco = MemoryLookup::new();
        let ep1 = Endpoint::empty_builder(RelayMode::Disabled)
            .address_lookup(disco.clone())
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;

        let ep2 = Endpoint::empty_builder(RelayMode::Disabled)
            .address_lookup(disco.clone())
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;

        disco.add_endpoint_info(ep1.addr());
        disco.add_endpoint_info(ep2.addr());

        let ep1_endpointid = ep1.id();
        let ep2_endpointid = ep2.id();
        eprintln!("endpoint id 1 {ep1_endpointid}");
        eprintln!("endpoint id 2 {ep2_endpointid}");

        async fn connect_hello(ep: Endpoint, dst: EndpointId) -> Result {
            let conn = ep.connect(dst, TEST_ALPN).await?;
            let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
            info!("sending hello");
            send.write_all(b"hello").await.anyerr()?;
            send.finish().anyerr()?;
            info!("receiving world");
            let m = recv.read_to_end(100).await.anyerr()?;
            assert_eq!(m, b"world");
            conn.close(1u8.into(), b"done");
            Ok(())
        }

        async fn accept_world(ep: Endpoint, src: EndpointId) -> Result {
            let incoming = ep.accept().await.anyerr()?;
            let mut iconn = incoming.accept().anyerr()?;
            let alpn = iconn.alpn().await?;
            let conn = iconn.await.anyerr()?;
            let endpoint_id = conn.remote_id();
            assert_eq!(endpoint_id, src);
            assert_eq!(alpn, TEST_ALPN);
            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
            info!("receiving hello");
            let m = recv.read_to_end(100).await.anyerr()?;
            assert_eq!(m, b"hello");
            info!("sending hello");
            send.write_all(b"world").await.anyerr()?;
            send.finish().anyerr()?;
            match conn.closed().await {
                ConnectionError::ApplicationClosed(closed) => {
                    assert_eq!(closed.error_code, 1u8.into());
                    Ok(())
                }
                _ => panic!("wrong close error"),
            }
        }

        let p1_accept = tokio::spawn(accept_world(ep1.clone(), ep2_endpointid).instrument(
            info_span!(
                "p1_accept",
                ep1 = %ep1.id().fmt_short(),
                dst = %ep2_endpointid.fmt_short(),
            ),
        ));
        let p2_accept = tokio::spawn(accept_world(ep2.clone(), ep1_endpointid).instrument(
            info_span!(
                "p2_accept",
                ep2 = %ep2.id().fmt_short(),
                dst = %ep1_endpointid.fmt_short(),
            ),
        ));
        let p1_connect = tokio::spawn(connect_hello(ep1.clone(), ep2_endpointid).instrument(
            info_span!(
                "p1_connect",
                ep1 = %ep1.id().fmt_short(),
                dst = %ep2_endpointid.fmt_short(),
            ),
        ));
        let p2_connect = tokio::spawn(connect_hello(ep2.clone(), ep1_endpointid).instrument(
            info_span!(
                "p2_connect",
                ep2 = %ep2.id().fmt_short(),
                dst = %ep1_endpointid.fmt_short(),
            ),
        ));

        p1_accept.await.anyerr()??;
        p2_accept.await.anyerr()??;
        p1_connect.await.anyerr()??;
        p2_connect.await.anyerr()??;

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn test_direct_addresses_no_qad_relay() -> Result {
        let (relay_map, _, _guard) = run_relay_server_with(false).await.unwrap();

        let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map))
            .alpns(vec![TEST_ALPN.to_vec()])
            .insecure_skip_relay_cert_verify(true)
            .bind()
            .await?;

        assert!(ep.addr().ip_addrs().count() > 0);

        Ok(())
    }

    #[cfg_attr(target_os = "windows", ignore = "flaky")]
    #[tokio::test]
    #[traced_test]
    async fn graceful_close() -> Result {
        let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
        let server = Endpoint::empty_builder(RelayMode::Disabled)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;
        let server_addr = server.addr();
        let server_task = tokio::spawn(async move {
            let incoming = server.accept().await.anyerr()?;
            let conn = incoming.await.anyerr()?;
            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
            let msg = recv.read_to_end(1_000).await.anyerr()?;
            send.write_all(&msg).await.anyerr()?;
            send.finish().anyerr()?;
            let close_reason = conn.closed().await;
            Ok::<_, Error>(close_reason)
        });

        let conn = client.connect(server_addr, TEST_ALPN).await?;
        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
        send.write_all(b"Hello, world!").await.anyerr()?;
        send.finish().anyerr()?;
        recv.read_to_end(1_000).await.anyerr()?;
        conn.close(42u32.into(), b"thanks, bye!");
        client.close().await;

        let close_err = server_task.await.anyerr()??;
        let ConnectionError::ApplicationClosed(app_close) = close_err else {
            panic!("Unexpected close reason: {close_err:?}");
        };

        assert_eq!(app_close.error_code, 42u32.into());
        assert_eq!(app_close.reason.as_ref(), b"thanks, bye!");

        Ok(())
    }

    #[cfg(feature = "metrics")]
    #[tokio::test]
    #[traced_test]
    async fn metrics_smoke() -> Result {
        use iroh_metrics::Registry;

        let secret_key = SecretKey::from_bytes(&[0u8; 32]);
        let client = Endpoint::empty_builder(RelayMode::Disabled)
            .secret_key(secret_key)
            .bind()
            .await?;
        let secret_key = SecretKey::from_bytes(&[1u8; 32]);
        let server = Endpoint::empty_builder(RelayMode::Disabled)
            .secret_key(secret_key)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;
        let server_addr = server.addr();
        let server_task = tokio::task::spawn(async move {
            let conn = server.accept().await.anyerr()?.await.anyerr()?;
            let mut uni = conn.accept_uni().await.anyerr()?;
            uni.read_to_end(10).await.anyerr()?;
            drop(conn);
            Ok::<_, Error>(server)
        });
        let conn = client.connect(server_addr, TEST_ALPN).await?;
        let mut uni = conn.open_uni().await.anyerr()?;
        uni.write_all(b"helloworld").await.anyerr()?;
        uni.finish().anyerr()?;
        conn.closed().await;
        drop(conn);
        let server = server_task.await.anyerr()??;

        let m = client.metrics();
        // assert_eq!(m.socket.num_direct_conns_added.get(), 1);
        // assert_eq!(m.socket.connection_became_direct.get(), 1);
        // assert_eq!(m.socket.connection_handshake_success.get(), 1);
        // assert_eq!(m.socket.endpoints_contacted_directly.get(), 1);
        assert!(m.socket.recv_datagrams.get() > 0);

        let m = server.metrics();
        // assert_eq!(m.socket.num_direct_conns_added.get(), 1);
        // assert_eq!(m.socket.connection_became_direct.get(), 1);
        // assert_eq!(m.socket.endpoints_contacted_directly.get(), 1);
        // assert_eq!(m.socket.connection_handshake_success.get(), 1);
        assert!(m.socket.recv_datagrams.get() > 0);

        // test openmetrics encoding with labeled subregistries per endpoint
        fn register_endpoint(registry: &mut Registry, endpoint: &Endpoint) {
            let id = endpoint.id().fmt_short();
            let sub_registry = registry.sub_registry_with_label("id", id.to_string());
            sub_registry.register_all(endpoint.metrics());
        }
        let mut registry = Registry::default();
        register_endpoint(&mut registry, &client);
        register_endpoint(&mut registry, &server);
        // let s = registry.encode_openmetrics_to_string().anyerr()?;
        // assert!(s.contains(r#"socket_endpoints_contacted_directly_total{id="3b6a27bcce"} 1"#));
        // assert!(s.contains(r#"socket_endpoints_contacted_directly_total{id="8a88e3dd74"} 1"#));
        Ok(())
    }

    /// Configures the accept side to take `accept_alpns` ALPNs, then connects to it with `primary_connect_alpn`
    /// with `secondary_connect_alpns` set, and finally returns the negotiated ALPN.
    async fn alpn_connection_test(
        accept_alpns: Vec<Vec<u8>>,
        primary_connect_alpn: &[u8],
        secondary_connect_alpns: Vec<Vec<u8>>,
    ) -> Result<Vec<u8>> {
        let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
        let server = Endpoint::empty_builder(RelayMode::Disabled)
            .alpns(accept_alpns)
            .bind()
            .await?;
        let server_addr = server.addr();
        let server_task = tokio::spawn({
            let server = server.clone();
            async move {
                let incoming = server.accept().await.anyerr()?;
                let conn = incoming.await.anyerr()?;
                conn.close(0u32.into(), b"bye!");
                n0_error::Ok(conn.alpn().to_vec())
            }
        });

        let conn = client
            .connect_with_opts(
                server_addr,
                primary_connect_alpn,
                ConnectOptions::new().with_additional_alpns(secondary_connect_alpns),
            )
            .await?;
        let conn = conn.await.anyerr()?;
        let client_alpn = conn.alpn();
        conn.closed().await;
        client.close().await;
        server.close().await;

        let server_alpn = server_task.await.anyerr()??;

        assert_eq!(client_alpn, server_alpn);

        Ok(server_alpn.to_vec())
    }

    #[tokio::test]
    #[traced_test]
    async fn connect_multiple_alpn_negotiated() -> Result {
        const ALPN_ONE: &[u8] = b"alpn/1";
        const ALPN_TWO: &[u8] = b"alpn/2";

        assert_eq!(
            alpn_connection_test(
                // Prefer version 2 over version 1 on the accept side
                vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
                ALPN_TWO,
                vec![ALPN_ONE.to_vec()],
            )
            .await?,
            ALPN_TWO.to_vec(),
            "accept side prefers version 2 over 1"
        );

        assert_eq!(
            alpn_connection_test(
                // Only support the old version
                vec![ALPN_ONE.to_vec()],
                ALPN_TWO,
                vec![ALPN_ONE.to_vec()],
            )
            .await?,
            ALPN_ONE.to_vec(),
            "accept side only supports the old version"
        );

        assert_eq!(
            alpn_connection_test(
                vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()],
                ALPN_ONE,
                vec![ALPN_TWO.to_vec()],
            )
            .await?,
            ALPN_TWO.to_vec(),
            "connect side ALPN order doesn't matter"
        );

        assert_eq!(
            alpn_connection_test(vec![ALPN_TWO.to_vec(), ALPN_ONE.to_vec()], ALPN_ONE, vec![],)
                .await?,
            ALPN_ONE.to_vec(),
            "connect side only supports the old version"
        );

        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn watch_net_report() -> Result {
        let endpoint = Endpoint::empty_builder(RelayMode::Staging).bind().await?;

        // can get a first report
        endpoint.net_report().updated().await.anyerr()?;

        Ok(())
    }

    /// Tests that initial connection establishment isn't extremely slow compared
    /// to subsequent connections.
    ///
    /// This is a time based test, but uses a very large ratio to reduce flakiness.
    /// It also does a number of connections to average out any anomalies.
    #[tokio::test]
    #[traced_test]
    async fn connect_multi_time() -> Result {
        let n = 32;

        const NOOP_ALPN: &[u8] = b"noop";

        #[derive(Debug, Clone)]
        struct Noop;

        impl ProtocolHandler for Noop {
            async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
                connection.closed().await;
                Ok(())
            }
        }

        async fn noop_server() -> Result<(Router, EndpointAddr)> {
            let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
                .bind()
                .await
                .anyerr()?;
            let addr = endpoint.addr();
            let router = Router::builder(endpoint).accept(NOOP_ALPN, Noop).spawn();
            Ok((router, addr))
        }

        let routers = stream::iter(0..n)
            .map(|_| noop_server())
            .buffered_unordered(32)
            .collect::<Vec<_>>()
            .await
            .into_iter()
            .collect::<Result<Vec<_>, _>>()
            .anyerr()?;

        let addrs = routers
            .iter()
            .map(|(_, addr)| addr.clone())
            .collect::<Vec<_>>();
        let ids = addrs.iter().map(|addr| addr.id).collect::<Vec<_>>();
        let address_lookup = MemoryLookup::from_endpoint_info(addrs);
        let endpoint = Endpoint::empty_builder(RelayMode::Disabled)
            .address_lookup(address_lookup)
            .bind()
            .await
            .anyerr()?;
        // wait for the endpoint to be initialized. This should not be needed,
        // but we don't want to measure endpoint init time but connection time
        // from a fully initialized endpoint.
        endpoint.addr();
        let t0 = Instant::now();
        for id in &ids {
            let conn = endpoint.connect(*id, NOOP_ALPN).await?;
            conn.close(0u32.into(), b"done");
        }
        let dt0 = t0.elapsed().as_secs_f64();
        let t1 = Instant::now();
        for id in &ids {
            let conn = endpoint.connect(*id, NOOP_ALPN).await?;
            conn.close(0u32.into(), b"done");
        }
        let dt1 = t1.elapsed().as_secs_f64();

        assert!(dt0 / dt1 < 20.0, "First round: {dt0}s, second round {dt1}s");
        Ok(())
    }

    #[tokio::test]
    async fn test_custom_relay() -> Result {
        let _ep = Endpoint::empty_builder(RelayMode::custom([RelayUrl::from_str(
            "https://use1-1.relay.n0.iroh-canary.iroh.link.",
        )?]))
        .bind()
        .await?;

        let relays = RelayMap::try_from_iter([
            "https://use1-1.relay.n0.iroh.iroh.link/",
            "https://euc1-1.relay.n0.iroh.iroh.link/",
        ])?;
        let _ep = Endpoint::empty_builder(RelayMode::Custom(relays))
            .bind()
            .await?;

        Ok(())
    }

    /// Testing bind_addr: Clear IP transports and add single IPv4 bind
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_clear() -> Result {
        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .clear_ip_transports()
            .bind_addr((Ipv4Addr::LOCALHOST, 0))?
            .bind()
            .await?;
        let bound_sockets = ep.bound_sockets();
        assert_eq!(bound_sockets.len(), 1);
        assert_eq!(bound_sockets[0].ip(), IpAddr::V4(Ipv4Addr::LOCALHOST));
        ep.close().await;
        Ok(())
    }

    /// Testing bind_addr: Do not clear IP transports and add single non-default IPv4 bind
    ///
    /// This will bind two sockets: default wildcard bind for IPv6, and our
    /// manually-added IPv4 bind.
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_no_clear() -> Result {
        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .bind_addr((Ipv4Addr::LOCALHOST, 0))?
            .bind()
            .await?;
        let bound_sockets = ep.bound_sockets();
        assert_eq!(bound_sockets.len(), 2);
        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 1);
        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
        // Test that our manually added socket is there
        assert!(
            bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
        );
        ep.close().await;
        Ok(())
    }

    // Testing bind_addr: Do not clear IP transports and add single default IPv4 bind.
    //
    // This replaces the default IPv4 bind added by the builder,
    // but keeps the default wildcard IPv6 bind.
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_default() -> Result {
        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .bind_addr_with_opts(
                (Ipv4Addr::LOCALHOST, 0),
                BindOpts::default().set_is_default_route(true),
            )?
            .bind()
            .await?;
        let bound_sockets = ep.bound_sockets();
        assert_eq!(bound_sockets.len(), 2);
        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 1);
        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
        assert!(
            bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
        );
        ep.close().await;
        drop(ep);

        Ok(())
    }

    /// Testing bind_addr: Do not clear IP transports and add single IPv4 bind with a non-zero prefix len
    ///
    /// This will bind three sockets: default wildcard bind for IPv4 and IPv6, and our
    /// manually-added IPv4 bind.
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_nonzero_prefix() -> Result {
        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .bind_addr_with_opts(
                (Ipv4Addr::LOCALHOST, 0),
                BindOpts::default().set_prefix_len(32),
            )?
            .bind()
            .await?;
        let bound_sockets = ep.bound_sockets();
        assert_eq!(bound_sockets.len(), 3);
        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv4()).count(), 2);
        assert_eq!(bound_sockets.iter().filter(|x| x.is_ipv6()).count(), 1);
        // Test that the default wildcard socket is there
        assert!(
            bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::UNSPECIFIED))
        );
        // Test that our manually added socket is there
        assert!(
            bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
        );
        ep.close().await;
        Ok(())
    }

    /// Bind on an unusable port with the default opts.
    ///
    /// Binding the endpoint fails with an AddrInUse error.
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_badport() -> Result {
        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
        let port = socket.local_addr()?.port();

        let res = Endpoint::empty_builder(RelayMode::Disabled)
            .clear_ip_transports()
            .bind_addr((Ipv4Addr::LOCALHOST, port))?
            .bind()
            .await;

        assert!(matches!(
            res,
            Err(BindError::Sockets {
                source: io_error,
                ..
            })
            if io_error.kind() == io::ErrorKind::AddrInUse
        ));
        Ok(())
    }

    /// Bind a non-default route on an unusable port, but set is_required = false.
    ///
    /// Binding the endpoint succeeds.
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_badport_notrequired() -> Result {
        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
        let port = socket.local_addr()?.port();

        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .bind_addr_with_opts(
                (Ipv4Addr::LOCALHOST, port),
                BindOpts::default()
                    .set_prefix_len(32)
                    .set_is_required(false),
            )?
            .bind()
            .await?;
        let bound_sockets = ep.bound_sockets();
        // just the default wildcard binds
        assert_eq!(bound_sockets.len(), 2);
        // our requested bind addr is not included because it failed to bind
        assert!(
            !bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
        );
        Ok(())
    }

    /// Bind on a default route on an unusable port, but set is_required = false.
    ///
    /// Binding the endpoint succeeds.
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_badport_default_notrequired() -> Result {
        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
        let port = socket.local_addr()?.port();

        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .bind_addr_with_opts(
                (Ipv4Addr::LOCALHOST, port),
                BindOpts::default().set_is_required(false),
            )?
            .bind()
            .await?;
        let bound_sockets = ep.bound_sockets();
        // just the IPv6 default, but no IPv4 bind at all because we replaced the default
        // with a bind with an unusable port and set it to not be required.
        assert_eq!(bound_sockets.len(), 1);
        assert!(bound_sockets[0].is_ipv6());
        Ok(())
    }

    /// Bind on an unusable port, with is_required = false, and no other transports.
    ///
    /// Binding the endpoint fails with "no valid address available".
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_badport_notrequired_no_other_transports() -> Result {
        let socket = std::net::UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))?;
        let port = socket.local_addr()?.port();

        let res = Endpoint::empty_builder(RelayMode::Disabled)
            .clear_ip_transports()
            .bind_addr_with_opts(
                (Ipv4Addr::LOCALHOST, port),
                BindOpts::default().set_is_required(false),
            )?
            .bind()
            .await;

        assert!(matches!(
            res,
            Err(BindError::CreateQuicEndpoint {
                source: io_error,
                ..
            })
            if io_error.kind() == io::ErrorKind::Other && io_error.to_string() == "no valid address available"
        ));
        Ok(())
    }

    /// Bind with prefix len 0 but set the route as non-default.
    #[tokio::test]
    #[traced_test]
    async fn test_bind_addr_prefix_len_0_not_default() -> Result {
        let ep = Endpoint::empty_builder(RelayMode::Disabled)
            .bind_addr_with_opts(
                (Ipv4Addr::LOCALHOST, 0),
                BindOpts::default().set_is_default_route(false),
            )?
            .bind()
            .await?;
        let bound_sockets = ep.bound_sockets();
        // The two default wildcard binds plus our additional route (which does not replace the default route
        // because we set is_default_route to false explicitly).
        assert_eq!(bound_sockets.len(), 3);
        assert!(
            bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V6(Ipv6Addr::UNSPECIFIED))
        );
        assert!(
            bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::UNSPECIFIED))
        );
        assert!(
            bound_sockets
                .iter()
                .any(|x| x.ip() == IpAddr::V4(Ipv4Addr::LOCALHOST))
        );
        Ok(())
    }

    #[ignore = "flaky"]
    #[tokio::test]
    #[traced_test]
    async fn connect_via_relay_becomes_direct_and_sends_direct() -> Result {
        let (relay_map, relay_url, _relay_server_guard) = run_relay_server().await?;
        let qlog = Arc::new(QlogFileGroup::from_env(
            "connect_via_relay_becomes_direct_and_sends_direct",
        ));
        let transfer_size = 1_000_000;

        fn collect_stats(mut watcher: PathWatcher) -> BTreeMap<TransportAddr, PathStats> {
            watcher
                .get()
                .iter()
                .map(|info| {
                    (
                        info.remote_addr().clone(),
                        info.stats().expect("conn is not yet dropped"),
                    )
                })
                .collect()
        }

        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true)
            .transport_config(qlog.create("client")?)
            .bind()
            .await?;
        let server = Endpoint::empty_builder(RelayMode::Custom(relay_map))
            .insecure_skip_relay_cert_verify(true)
            .transport_config(qlog.create("server")?)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .await?;
        let server_addr = EndpointAddr::new(server.id()).with_relay_url(relay_url);
        let server_task = tokio::spawn(async move {
            let incoming = server.accept().await.anyerr()?;
            let conn = incoming.await.anyerr()?;
            let watcher = conn.paths();
            let (mut send, mut recv) = conn.accept_bi().await.anyerr()?;
            let msg = recv.read_to_end(transfer_size).await.anyerr()?;
            send.write_all(&msg).await.anyerr()?;
            send.finish().anyerr()?;
            conn.closed().await;
            let stats = collect_stats(watcher);
            Ok::<_, Error>(stats)
        });

        let conn = client.connect(server_addr, TEST_ALPN).await?;
        let watcher = conn.paths();
        let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
        send.write_all(&vec![42u8; transfer_size]).await.anyerr()?;
        send.finish().anyerr()?;
        recv.read_to_end(transfer_size).await.anyerr()?;
        conn.close(0u32.into(), b"thanks, bye!");
        client.close().await;
        let client_stats = collect_stats(watcher);
        let server_stats = server_task.await.anyerr()??;

        info!("client stats: {client_stats:#?}");
        info!("server stats: {server_stats:#?}");

        let client_total_relay_tx = client_stats
            .iter()
            .filter(|(remote, _stats)| remote.is_relay())
            .map(|(_, stats)| stats.udp_tx.bytes)
            .sum::<u64>();
        let client_total_relay_rx = client_stats
            .iter()
            .filter(|(remote, _stats)| remote.is_relay())
            .map(|(_, stats)| stats.udp_rx.bytes)
            .sum::<u64>();
        let server_total_relay_tx = server_stats
            .iter()
            .filter(|(remote, _stats)| remote.is_relay())
            .map(|(_, stats)| stats.udp_tx.bytes)
            .sum::<u64>();
        let server_total_relay_rx = server_stats
            .iter()
            .filter(|(remote, _stats)| remote.is_relay())
            .map(|(_, stats)| stats.udp_rx.bytes)
            .sum::<u64>();

        info!(?client_total_relay_tx, "total");
        info!(?client_total_relay_rx, "total");
        info!(?server_total_relay_tx, "total");
        info!(?server_total_relay_rx, "total");

        // We should send/receive only the minorty of traffic via the relay.
        assert!(client_total_relay_tx < transfer_size as u64 / 2);
        assert!(client_total_relay_rx < transfer_size as u64 / 2);
        assert!(server_total_relay_tx < transfer_size as u64 / 2);
        assert!(server_total_relay_rx < transfer_size as u64 / 2);

        Ok(())
    }

    /// Tests that correct logs are emitted when connecting two endpoints with same secret keys to a relay.
    #[tokio::test]
    #[traced_test]
    async fn same_endpoint_id_relay() -> Result {
        let (relay_map, relay_url, _relay_server_guard) = run_relay_server().await?;
        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(1u64);
        let secret_key = SecretKey::generate(&mut rng);

        let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
            .insecure_skip_relay_cert_verify(true)
            .bind()
            .instrument(error_span!("ep-client"))
            .await?;

        info!("client {}", client.id());

        // bind ep1 and wait until connected to relay.
        let ep1 = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
            .secret_key(secret_key.clone())
            .insecure_skip_relay_cert_verify(true)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .instrument(error_span!("ep1"))
            .await?;
        info!("ep1 bound {:?}", ep1.id());
        ep1.online().await;
        info!("ep1 online");

        let addr = EndpointAddr::new(secret_key.public()).with_relay_url(relay_url.clone());

        tokio::try_join!(
            async {
                let conn = client.connect(addr.clone(), TEST_ALPN).await?;
                let reason = conn.closed().await;
                assert!(is_application_closed(&reason, 1));
                n0_error::Ok(())
            },
            async {
                let conn = ep1.accept().await.unwrap().await?;
                conn.close(1u32.into(), b"bye");
                n0_error::Ok(())
            }
        )?;
        info!("client connected to ep1");

        // now start second endpoint with same secret key
        let ep2 = Endpoint::empty_builder(RelayMode::Custom(relay_map))
            .secret_key(secret_key.clone())
            .insecure_skip_relay_cert_verify(true)
            .alpns(vec![TEST_ALPN.to_vec()])
            .bind()
            .instrument(error_span!("ep2"))
            .await?;
        info!("ep2 bound {:?}", ep2.id());
        ep2.online().await;
        println!("ep2 online");

        // `online` does not mean that the connection to the home relay was *established*,
        // only that the home relay was *chosen* based on the net report probes.
        // We need to wait for the connection to be established though, to be sure that new packets
        // will be routed to the new endpoint and not to the old endpoint anymore.
        // We don't expose being connected to the home relay on the endpoint currently,
        // so we resort to log assertions.
        // TODO(Frando): Replace once we add a proper API for this.
        let expected_log_line = format!(
            "ep2:relay-actor:active-relay{{url={relay_url}}}:connected: iroh::_events::relay::connected"
        );
        tokio::time::timeout(Duration::from_secs(5), async {
            while !logs_contain(&expected_log_line) {
                tokio::time::sleep(Duration::from_millis(10)).await
            }
        })
        .await
        .std_context("relay connection did not establish in time")?;

        tokio::try_join!(
            async {
                let conn = client.connect(addr.clone(), TEST_ALPN).await?;
                let reason = conn.closed().await;
                assert!(is_application_closed(&reason, 1));
                n0_error::Ok(())
            },
            async {
                let conn = ep2.accept().await.unwrap().await?;
                conn.close(1u32.into(), b"bye");
                n0_error::Ok(())
            }
        )?;
        println!("client connected to ep2");

        // assert that ep1 did not receive a connection
        assert!(now_or_never(ep1.accept()).is_none());

        // We assert that we get the warn log once for endpoint 1, and not at all for endpoint 2.
        logs_assert(|logs| {
            let expected_line = |line: &str| {
                line.contains("WARN") && line.contains("Another endpoint connected with the same endpoint id. No more messages will be received")
            };
            let count_line_ep1 = logs
                .iter()
                .filter(|line| line.contains(":ep1:") && expected_line(line))
                .count();
            let count_line_ep2 = logs
                .iter()
                .filter(|line| line.contains(":ep2:") && expected_line(line))
                .count();
            if count_line_ep1 == 1 && count_line_ep2 == 0 {
                Ok(())
            } else {
                Err("Logs don't match expectations".to_string())
            }
        });
        tokio::join!(ep1.close(), ep2.close(), client.close());
        Ok(())
    }

    fn is_application_closed(close_reason: &ConnectionError, code: u32) -> bool {
        matches!(
            close_reason,
            ConnectionError::ApplicationClosed(f) if f.error_code ==code.into()
        )
    }

    #[tokio::test]
    #[traced_test]
    async fn test_closed_endpoint_behaviour() -> Result {
        // create endpoint
        // call endpoint.close
        // ensure methods behave in the expected way
        info!("Creating endpoint");
        let ep = Endpoint::builder().bind().await?;
        let closed = ep.closed();
        info!("Closing endpoint");
        let now = Instant::now();
        ep.close().await;
        info!("Endpoint closed in {:?}", now.elapsed());

        // Assert that the `closed` cancellation token is now cancelled
        assert_eq!(now_or_never(closed), Some(()));

        info!("Set ALPNS fails silently");
        ep.set_alpns(vec![b"test".into()]);

        info!("Insert Relay returns None");
        let relay_config = crate::defaults::staging::default_na_east_relay();
        assert!(
            ep.insert_relay("localhost:300".parse()?, Arc::new(relay_config))
                .await
                .is_none()
        );

        info!("Remove Relay returns None");
        assert!(ep.remove_relay(&"localhost:300".parse()?).await.is_none());

        info!("Connecting");
        let mut rng = ChaCha8Rng::seed_from_u64(41);
        let ep_id = SecretKey::generate(&mut rng).public();

        // should likely be an error that states that the
        // endpoint is closed instead:
        if let ConnectError::Connect { source, .. } = ep.connect(ep_id, b"test").await.unwrap_err()
        {
            assert!(matches!(
                source,
                ConnectWithOptsError::EndpointClosed { .. }
            ));
        } else {
            panic!("unexpected error for connect");
        }

        info!("Accepting!");
        assert!(ep.accept().await.is_none());

        // this should work
        info!("Addr: {:?}", ep.addr());

        // create watchers to verify they terminate after the endpoint is dropped.
        let mut addrs = ep.watch_addr().stream();
        let mut net_reports = ep.net_report().stream();

        // returns None
        let net_report = ep.last_net_report();
        info!("last Net report {net_report:?}");

        // this should work
        let sockets = ep.bound_sockets();
        info!("Sockets: {sockets:?}");

        // these should return errors
        assert!(ep.dns_resolver().is_err());
        assert!(ep.address_lookup().is_err());

        // this should work
        let metrics = ep.metrics();
        info!("Metrics: {metrics:?}");

        // this should return none
        assert!(ep.remote_info(ep_id).await.is_none());

        // this should fail silently
        ep.network_change().await;

        // this should fail silently
        ep.set_user_data_for_address_lookup(Some(
            UserData::try_from("TEST".to_string()).expect("valid string"),
        ));
        drop(ep);
        // now that the endpoint is dropped, all watchers should terminate.
        tokio::time::timeout(Duration::from_secs(1), async {
            while let Some(addr) = addrs.next().await {
                info!("Addrs stream: {addr:?}");
            }
            while let Some(net_report) = net_reports.next().await {
                info!("Net report stream: {net_report:?}");
            }
        })
        .await
        .expect("watchers not closed");

        info!("Done!");
        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn test_closed_endpoint_unpolled_accept_fut() -> Result {
        info!("Creating endpoint");
        let ep = Endpoint::builder().bind().await?;

        info!("Get accept future");
        let accept_fut = ep.accept();

        info!("Closing endpoint");
        let now = Instant::now();
        tokio::time::timeout(Duration::from_secs(5), ep.close())
            .await
            .expect("Endpoint closes in a reasonable time");
        info!("Endpoint closed in {:?}", now.elapsed());

        info!("Accept future returns None after the endpoint has closed");
        let incoming = accept_fut.await;
        assert!(incoming.is_none());
        Ok(())
    }

    #[tokio::test]
    #[traced_test]
    async fn test_closed_endpoint_polled_accept_fut() -> Result {
        info!("Creating endpoint");
        let ep = Endpoint::builder().bind().await?;

        info!("Run an accept task");
        let ep2 = ep.clone();
        let accept_task = tokio::spawn(async move {
            info!("Waiting on Accept");
            let res = ep2.accept().await;
            info!("Accept await has returned");
            res
        });

        // Try to ensure the accept future is polled at least once.
        tokio::time::sleep(Duration::from_millis(10)).await;

        info!("Closing the endpoint");
        tokio::time::timeout(Duration::from_secs(5), ep.close())
            .await
            .expect("Endpoint closes in a reasonable time");
        info!("Endpoint closed");

        info!("Await the accept task");
        let incoming = accept_task.await.expect("accept task panicked");
        assert!(incoming.is_none());

        Ok(())
    }
}

Synonyms

radio/iroh-ffi/src/endpoint.rs
radio/iroh-ffi/iroh-js/src/endpoint.rs

Neighbours