use std::{
any::Any,
future::{Future, IntoFuture},
net::{IpAddr, SocketAddr},
pin::Pin,
sync::Arc,
task::Poll,
};
use ed25519_dalek::{VerifyingKey, pkcs8::DecodePublicKey};
use futures_util::{FutureExt, future::Shared};
use iroh_base::EndpointId;
use n0_error::{e, stack_error};
use n0_future::{TryFutureExt, future::Boxed as BoxFuture, time::Duration};
use pin_project::pin_project;
use quinn::WeakConnectionHandle;
use tracing::warn;
use crate::{
Endpoint,
endpoint::{
AfterHandshakeOutcome,
quic::{
AcceptBi, AcceptUni, ConnectionError, ConnectionStats, Controller,
ExportKeyingMaterialError, OpenBi, OpenUni, PathId, ReadDatagram, SendDatagram,
SendDatagramError, ServerConfig, Side, VarInt,
},
},
socket::{
RemoteStateActorStoppedError,
remote_map::{PathInfo, PathWatchable, PathWatcher},
},
};
#[derive(derive_more::Debug)]
#[pin_project]
pub struct Accept<'a> {
#[pin]
#[debug("quinn::Accept")]
pub(crate) inner: quinn::Accept<'a>,
pub(crate) ep: Endpoint,
}
impl Future for Accept<'_> {
type Output = Option<Incoming>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(inner)) => Poll::Ready(Some(Incoming {
inner,
ep: this.ep.clone(),
})),
}
}
}
#[derive(Debug)]
pub struct Incoming {
inner: quinn::Incoming,
ep: Endpoint,
}
impl Incoming {
pub fn accept(self) -> Result<Accepting, ConnectionError> {
self.inner
.accept()
.map(|conn| Accepting::new(conn, self.ep))
}
pub fn accept_with(
self,
server_config: Arc<ServerConfig>,
) -> Result<Accepting, ConnectionError> {
self.inner
.accept_with(server_config.to_inner_arc())
.map(|conn| Accepting::new(conn, self.ep))
}
pub fn refuse(self) {
self.inner.refuse()
}
#[allow(clippy::result_large_err)]
pub fn retry(self) -> Result<(), RetryError> {
self.inner
.retry()
.map_err(|err| e!(RetryError { err, ep: self.ep }))
}
pub fn ignore(self) {
self.inner.ignore()
}
pub fn local_ip(&self) -> Option<IpAddr> {
self.inner.local_ip()
}
pub fn remote_address(&self) -> SocketAddr {
self.inner.remote_address()
}
pub fn remote_address_validated(&self) -> bool {
self.inner.remote_address_validated()
}
}
impl IntoFuture for Incoming {
type Output = Result<Connection, ConnectingError>;
type IntoFuture = IncomingFuture;
fn into_future(self) -> Self::IntoFuture {
IncomingFuture(Box::pin(async move {
let quinn_conn = self.inner.into_future().await?;
let conn = conn_from_quinn_conn(quinn_conn, &self.ep)?.await?;
Ok(conn)
}))
}
}
#[stack_error(derive, add_meta, from_sources)]
#[error("retry() with validated Incoming")]
pub struct RetryError {
err: quinn::RetryError,
ep: Endpoint,
}
impl RetryError {
pub fn into_incoming(self) -> Incoming {
Incoming {
inner: self.err.into_incoming(),
ep: self.ep,
}
}
}
#[derive(derive_more::Debug)]
#[debug("IncomingFuture")]
pub struct IncomingFuture(BoxFuture<Result<Connection, ConnectingError>>);
impl Future for IncomingFuture {
type Output = Result<Connection, ConnectingError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
self.0.poll_unpin(cx)
}
}
fn alpn_from_quinn_conn(conn: &quinn::Connection) -> Option<Vec<u8>> {
let data = conn.handshake_data()?;
match data.downcast::<quinn::crypto::rustls::HandshakeData>() {
Ok(data) => data.protocol,
Err(_) => None,
}
}
async fn alpn_from_quinn_connecting(conn: &mut quinn::Connecting) -> Result<Vec<u8>, AlpnError> {
let data = conn.handshake_data().await?;
match data.downcast::<quinn::crypto::rustls::HandshakeData>() {
Ok(data) => match data.protocol {
Some(protocol) => Ok(protocol),
None => Err(e!(AlpnError::Unavailable)),
},
Err(_) => Err(e!(AlpnError::UnknownHandshake)),
}
}
#[stack_error(add_meta, derive, from_sources)]
#[allow(missing_docs)]
#[non_exhaustive]
#[derive(Clone)]
pub enum AuthenticationError {
#[error(transparent)]
RemoteId { source: RemoteEndpointIdError },
#[error("no ALPN provided")]
NoAlpn {},
}
fn conn_from_quinn_conn(
conn: quinn::Connection,
ep: &Endpoint,
) -> Result<
impl Future<Output = Result<Connection, ConnectingError>> + Send + 'static,
ConnectingError,
> {
let info = match static_info_from_conn(&conn) {
Ok(val) => val,
Err(auth_err) => {
if let Some(conn_err) = conn.close_reason() {
return Err(e!(ConnectingError::ConnectionError { source: conn_err }));
} else {
return Err(e!(ConnectingError::HandshakeFailure { source: auth_err }));
}
}
};
let fut = ep
.inner
.register_connection(info.endpoint_id, conn.weak_handle());
let inner = ep.inner.clone();
Ok(async move {
let paths = fut.await?;
let conn = Connection {
data: HandshakeCompletedData { info, paths },
inner: conn,
};
if let AfterHandshakeOutcome::Reject { error_code, reason } =
inner.hooks.after_handshake(&conn.to_info()).await
{
conn.close(error_code, &reason);
return Err(e!(ConnectingError::LocallyRejected));
}
Ok(conn)
})
}
fn static_info_from_conn(conn: &quinn::Connection) -> Result<StaticInfo, AuthenticationError> {
let endpoint_id = remote_id_from_quinn_conn(conn)?;
let alpn = alpn_from_quinn_conn(conn).ok_or_else(|| e!(AuthenticationError::NoAlpn))?;
Ok(StaticInfo { endpoint_id, alpn })
}
fn remote_id_from_quinn_conn(
conn: &quinn::Connection,
) -> Result<EndpointId, RemoteEndpointIdError> {
let data = conn.peer_identity();
match data {
None => {
warn!("no peer certificate found");
Err(RemoteEndpointIdError::new())
}
Some(data) => match data.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
Ok(certs) => {
if certs.len() != 1 {
warn!(
"expected a single peer certificate, but {} found",
certs.len()
);
return Err(RemoteEndpointIdError::new());
}
let peer_id = EndpointId::from_verifying_key(
VerifyingKey::from_public_key_der(&certs[0])
.map_err(|_| RemoteEndpointIdError::new())?,
);
Ok(peer_id)
}
Err(err) => {
warn!("invalid peer certificate: {:?}", err);
Err(RemoteEndpointIdError::new())
}
},
}
}
#[derive(derive_more::Debug)]
pub struct Connecting {
inner: quinn::Connecting,
#[debug("{}", register_with_socket.as_ref().map(|_| "Some(RegisterWithSocketFut)").unwrap_or("None"))]
register_with_socket: Option<RegisterWithSocketFut>,
ep: Endpoint,
/// `Some(remote_id)` if this is an outgoing connection, `None` if this is an incoming conn
remote_endpoint_id: EndpointId,
}
type RegisterWithSocketFut = BoxFuture<Result<Connection, ConnectingError>>;
/// In-progress connection attempt future
#[derive(derive_more::Debug)]
pub struct Accepting {
inner: quinn::Connecting,
/// Future to register the connection with the socket.
///
/// This is set and polled after `inner` completes. We are using an option instead of an enum
/// because we need infallible access to `inner` in some methods.
#[debug("{}", register_with_socket.as_ref().map(|_| "Some(RegisterWithSocketFut)").unwrap_or("None"))]
register_with_socket: Option<RegisterWithSocketFut>,
ep: Endpoint,
}
#[stack_error(add_meta, derive, from_sources)]
#[allow(missing_docs)]
#[non_exhaustive]
pub enum AlpnError {
#[error(transparent)]
ConnectionError {
#[error(std_err)]
source: ConnectionError,
},
#[error("No ALPN available")]
Unavailable,
#[error("Unknown handshake type")]
UnknownHandshake,
}
#[stack_error(add_meta, derive, from_sources)]
#[allow(missing_docs)]
#[non_exhaustive]
#[derive(Clone)]
#[allow(private_interfaces)]
pub enum ConnectingError {
#[error(transparent)]
ConnectionError {
#[error(std_err)]
source: ConnectionError,
},
#[error("Failure finalizing the handshake")]
HandshakeFailure { source: AuthenticationError },
#[error("internal consistency error")]
InternalConsistencyError {
/// Private source type, cannot be created publicly.
source: RemoteStateActorStoppedError,
},
#[error("Connection was rejected locally")]
LocallyRejected,
}
impl Connecting {
pub(crate) fn new(
inner: quinn::Connecting,
ep: Endpoint,
remote_endpoint_id: EndpointId,
) -> Self {
Self {
inner,
ep,
remote_endpoint_id,
register_with_socket: None,
}
}
/// Converts this [`Connecting`] into a 0-RTT connection at the cost of weakened security.
///
/// If 0-RTT can be attempted, returns a [`OutgoingZeroRttConnection`], which represents
/// outgoing 0-RTT connection.
///
/// If the 0-RTT cannot even be attempted, returns back the same [`Connecting`] without
/// changes. You can still `.await` this [`Connecting`] to get a normal [`Connection`].
///
/// The [`OutgoingZeroRttConnection`] will attempt to resume a previous TLS session. However,
/// **the remote endpoint may actually _reject_ the 0-RTT data--yet still accept
/// the connection attempt in general**, once the handshake has completed.
///
/// This possibility of whether the 0-RTT data was accepted or rejected is conveyed
/// through the [`ZeroRttStatus`] after calling [`OutgoingZeroRttConnection::handshake_completed`].
/// When the handshake completes, it returns [`ZeroRttStatus::Accepted`] if the 0-RTT data
/// was accepted and [`ZeroRttStatus::Rejected`] if it was rejected. If it was rejected, the
/// existence of any streams opened and application data sent prior to the handshake
/// completing will not be conveyed to the remote application, and local operations on them
/// will return `ZeroRttRejected` errors.
///
/// A server may reject 0-RTT data at its discretion, but accepting 0-RTT data requires the
/// relevant resumption state to be stored in the server, which servers may limit or lose for
/// various reasons including not persisting resumption state across server restarts.
///
/// ## Security
///
/// This enables transmission of 0-RTT data, which is vulnerable to replay attacks, and
/// should therefore never invoke non-idempotent operations.
///
/// You can use [`RecvStream::is_0rtt`] to check whether a stream has been opened in 0-RTT
/// and thus whether parts of the stream are operating under this reduced security level.
///
/// See also documentation for [`Accepting::into_0rtt`].
///
/// [`RecvStream::is_0rtt`]: quinn::RecvStream::is_0rtt
#[allow(clippy::result_large_err)]
pub fn into_0rtt(self) -> Result<OutgoingZeroRttConnection, Connecting> {
match self.inner.into_0rtt() {
Ok((quinn_conn, zrtt_accepted)) => {
let accepted: BoxFuture<_> = Box::pin({
let quinn_conn = quinn_conn.clone();
async move {
let accepted = zrtt_accepted.await;
let conn = conn_from_quinn_conn(quinn_conn, &self.ep)?.await?;
Ok(match accepted {
true => ZeroRttStatus::Accepted(conn),
false => ZeroRttStatus::Rejected(conn),
})
}
});
let accepted = accepted.shared();
Ok(Connection {
inner: quinn_conn,
data: OutgoingZeroRttData { accepted },
})
}
Err(inner) => Err(Self { inner, ..self }),
}
}
/// Parameters negotiated during the handshake
pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
self.inner.handshake_data().await
}
/// Extracts the ALPN protocol from the peer's handshake data.
pub async fn alpn(&mut self) -> Result<Vec<u8>, AlpnError> {
alpn_from_quinn_connecting(&mut self.inner).await
}
/// Returns the [`EndpointId`] of the endpoint that this connection attempt tries to connect to.
pub fn remote_id(&self) -> EndpointId {
self.remote_endpoint_id
}
}
impl Future for Connecting {
type Output = Result<Connection, ConnectingError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(fut) = &mut self.register_with_socket {
return fut.poll_unpin(cx).map_err(Into::into);
} else {
let quinn_conn = std::task::ready!(self.inner.poll_unpin(cx)?);
let fut = conn_from_quinn_conn(quinn_conn, &self.ep)?;
self.register_with_socket = Some(Box::pin(fut.err_into()));
}
}
}
}
impl Accepting {
pub(crate) fn new(inner: quinn::Connecting, ep: Endpoint) -> Self {
Self {
inner,
ep,
register_with_socket: None,
}
}
/// Converts this [`Accepting`] into a 0-RTT or 0.5-RTT connection at the cost of weakened
/// security.
///
/// Returns a [`IncomingZeroRttConnection`], which represents an incoming 0-RTT or 0.5-RTT connection.
///
/// If the connection was initiated with 0-RTT by the remote endpoint, the local endpoint
/// might accept the 0-RTT attempt, allowing the local endpoint to receive application streams
/// and data before the handshake finishes.
///
/// Otherwise this will enable 0.5-RTT, allowing the [`IncomingZeroRttConnection`] to open streams and send
/// data before the handshake finishes.
///
/// ## Security
///
/// Transmitted 0-RTT data from the client is vulnerable to replay attacks, and should
/// therefore never invoke non-idempotent operations.
///
/// Transmission of 0.5-RTT data from the server may be sent before TLS client authentication
/// has occurred, and should therefore not be used to send data for which client
/// authentication is required.
///
/// You can use [`RecvStream::is_0rtt`] to check whether a stream has been opened in 0-RTT
/// and thus whether parts of the stream are operating under this reduced security level.
///
/// See also documentation for [`Connecting::into_0rtt`].
///
/// [`RecvStream::is_0rtt`]: crate::endpoint::RecvStream::is_0rtt
pub fn into_0rtt(self) -> IncomingZeroRttConnection {
let (quinn_conn, zrtt_accepted) = self
.inner
.into_0rtt()
.expect("incoming connections can always be converted to 0-RTT");
let accepted: BoxFuture<_> = Box::pin({
let quinn_conn = quinn_conn.clone();
async move {
let _ = zrtt_accepted.await;
let conn = conn_from_quinn_conn(quinn_conn, &self.ep)?.await?;
Ok(conn)
}
});
let accepted = accepted.shared();
IncomingZeroRttConnection {
inner: quinn_conn,
data: IncomingZeroRttData { accepted },
}
}
/// Parameters negotiated during the handshake
pub async fn handshake_data(&mut self) -> Result<Box<dyn Any>, ConnectionError> {
self.inner.handshake_data().await
}
/// Extracts the ALPN protocol from the peer's handshake data.
pub async fn alpn(&mut self) -> Result<Vec<u8>, AlpnError> {
alpn_from_quinn_connecting(&mut self.inner).await
}
}
impl Future for Accepting {
type Output = Result<Connection, ConnectingError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(fut) = &mut self.register_with_socket {
return fut.poll_unpin(cx).map_err(Into::into);
} else {
let quinn_conn = std::task::ready!(self.inner.poll_unpin(cx)?);
match conn_from_quinn_conn(quinn_conn, &self.ep) {
Err(err) => return Poll::Ready(Err(err)),
Ok(fut) => self.register_with_socket = Some(Box::pin(fut.err_into())),
};
}
}
}
}
/// The client side of a 0-RTT connection.
///
/// This is created using [`Connecting::into_0rtt`].
///
/// Creating a `OutgoingZeroRttConnection` means that the endpoint is capable
/// of attempting a 0-RTT connection with the remote. The remote may still
/// reject the 0-RTT connection. In which case, any data sent before the
/// handshake has completed may need to be resent.
///
/// Look at the [`OutgoingZeroRttConnection::handshake_completed`] method for
/// more details.
pub type OutgoingZeroRttConnection = Connection<OutgoingZeroRtt>;
/// Returned from [`OutgoingZeroRttConnection::handshake_completed`].
#[derive(Debug, Clone)]
pub enum ZeroRttStatus {
/// If the 0-RTT data was accepted, you can continue to use any streams
/// that were created before the handshake was completed.
Accepted(Connection),
/// If the 0-RTT data was rejected, any streams that were created before
/// the handshake was completed will error and any data that was
/// previously sent on those streams will need to be resent.
Rejected(Connection),
}
/// A QUIC connection on the server-side that can possibly accept 0-RTT data.
///
/// It is very similar to a `Connection`, but the `IncomingZeroRttConnection::remote_id`
/// and `IncomingZeroRttConnection::alpn` may not be set yet, since the handshake has
/// not necessarily occurred yet.
///
/// If the `IncomingZeroRttConnection` has rejected 0-RTT or does not have enough information
/// to accept 0-RTT, any received 0-RTT packets will simply be dropped before
/// reaching any receive streams.
///
/// Any streams that are created to send or receive data can continue to be used
/// even after the handshake has completed and we are no longer in a 0-RTT
/// situation.
///
/// Use the [`IncomingZeroRttConnection::handshake_completed`] method to get a [`Connection`] from a
/// `IncomingZeroRttConnection`. This waits until 0-RTT connection has completed
/// the handshake and can now confidently derive the ALPN and the
/// [`EndpointId`] of the remote endpoint.
pub type IncomingZeroRttConnection = Connection<IncomingZeroRtt>;
/// A QUIC connection.
///
/// If all references to a connection (including every clone of the Connection handle,
/// streams of incoming streams, and the various stream types) have been dropped, then the
/// connection will be automatically closed with an error_code of 0 and an empty reason. You
/// can also close the connection explicitly by calling [`Connection::close`].
///
/// Closing the connection immediately abandons efforts to deliver data to the peer. Upon
/// receiving CONNECTION_CLOSE the peer may drop any stream data not yet delivered to the
/// application. [`Connection::close`] describes in more detail how to gracefully close a
/// connection without losing application data.
///
/// May be cloned to obtain another handle to the same connection.
#[derive(Debug, Clone)]
pub struct Connection<State: ConnectionState = HandshakeCompleted> {
inner: quinn::Connection,
/// State-specific information
data: State::Data,
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct HandshakeCompletedData {
info: StaticInfo,
paths: PathWatchable,
}
/// Static info from a completed TLS handshake.
#[derive(Debug, Clone)]
struct StaticInfo {
endpoint_id: EndpointId,
alpn: Vec<u8>,
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct IncomingZeroRttData {
accepted: Shared<BoxFuture<Result<Connection, ConnectingError>>>,
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct OutgoingZeroRttData {
accepted: Shared<BoxFuture<Result<ZeroRttStatus, ConnectingError>>>,
}
mod sealed {
pub trait Sealed {}
}
/// Trait to track the state of a [`Connection`] at compile time.
pub trait ConnectionState: sealed::Sealed {
/// State-specific data stored in the [`Connection`].
type Data: std::fmt::Debug + Clone;
}
/// Marker type for a connection that has completed the handshake.
#[derive(Debug, Clone)]
pub struct HandshakeCompleted;
/// Marker type for a connection that is in the incoming 0-RTT state.
#[derive(Debug, Clone)]
pub struct IncomingZeroRtt;
/// Marker type for a connection that is in the outgoing 0-RTT state.
#[derive(Debug, Clone)]
pub struct OutgoingZeroRtt;
impl sealed::Sealed for HandshakeCompleted {}
impl ConnectionState for HandshakeCompleted {
type Data = HandshakeCompletedData;
}
impl sealed::Sealed for IncomingZeroRtt {}
impl ConnectionState for IncomingZeroRtt {
type Data = IncomingZeroRttData;
}
impl sealed::Sealed for OutgoingZeroRtt {}
impl ConnectionState for OutgoingZeroRtt {
type Data = OutgoingZeroRttData;
}
#[allow(missing_docs)]
#[stack_error(add_meta, derive)]
#[error("Protocol error: no remote id available")]
#[derive(Clone)]
pub struct RemoteEndpointIdError;
impl<T: ConnectionState> Connection<T> {
/// Initiates a new outgoing unidirectional stream.
///
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer wonβt be notified that a stream has been opened until the
/// stream is actually used.
#[inline]
pub fn open_uni(&self) -> OpenUni<'_> {
self.inner.open_uni()
}
/// Initiates a new outgoing bidirectional stream.
///
/// Streams are cheap and instantaneous to open unless blocked by flow control. As a
/// consequence, the peer won't be notified that a stream has been opened until the
/// stream is actually used. Calling [`open_bi`] then waiting on the [`RecvStream`]
/// without writing anything to [`SendStream`] will never succeed.
///
/// [`open_bi`]: Connection::open_bi
/// [`SendStream`]: crate::endpoint::SendStream
/// [`RecvStream`]: crate::endpoint::RecvStream
#[inline]
pub fn open_bi(&self) -> OpenBi<'_> {
self.inner.open_bi()
}
/// Accepts the next incoming uni-directional stream.
#[inline]
pub fn accept_uni(&self) -> AcceptUni<'_> {
self.inner.accept_uni()
}
/// Accept the next incoming bidirectional stream.
///
/// **Important Note**: The peer that calls [`open_bi`] must write to its [`SendStream`]
/// before the peer `Connection` is able to accept the stream using
/// `accept_bi()`. Calling [`open_bi`] then waiting on the [`RecvStream`] without
/// writing anything to the connected [`SendStream`] will never succeed.
///
/// [`open_bi`]: Connection::open_bi
/// [`SendStream`]: crate::endpoint::SendStream
/// [`RecvStream`]: crate::endpoint::RecvStream
#[inline]
pub fn accept_bi(&self) -> AcceptBi<'_> {
self.inner.accept_bi()
}
/// Receives an application datagram.
#[inline]
pub fn read_datagram(&self) -> ReadDatagram<'_> {
self.inner.read_datagram()
}
/// Wait for the connection to be closed for any reason.
///
/// Despite the return type's name, closed connections are often not an error condition
/// at the application layer. Cases that might be routine include
/// [`ConnectionError::LocallyClosed`] and [`ConnectionError::ApplicationClosed`].
#[inline]
pub async fn closed(&self) -> ConnectionError {
self.inner.closed().await
}
/// If the connection is closed, the reason why.
///
/// Returns `None` if the connection is still open.
#[inline]
pub fn close_reason(&self) -> Option<ConnectionError> {
self.inner.close_reason()
}
/// Closes the connection immediately.
///
/// Pending operations will fail immediately with [`ConnectionError::LocallyClosed`]. No
/// more data is sent to the peer and the peer may drop buffered data upon receiving the
/// CONNECTION_CLOSE frame.
///
/// `error_code` and `reason` are not interpreted, and are provided directly to the
/// peer.
///
/// `reason` will be truncated to fit in a single packet with overhead; to improve odds
/// that it is preserved in full, it should be kept under 1KiB.
///
/// # Gracefully closing a connection
///
/// Only the peer last receiving application data can be certain that all data is
/// delivered. The only reliable action it can then take is to close the connection,
/// potentially with a custom error code. The delivery of the final CONNECTION_CLOSE
/// frame is very likely if both endpoints stay online long enough, calling
/// [`Endpoint::close`] will wait to provide sufficient time. Otherwise, the remote peer
/// will time out the connection, provided that the idle timeout is not disabled.
///
/// The sending side can not guarantee all stream data is delivered to the remote
/// application. It only knows the data is delivered to the QUIC stack of the remote
/// endpoint. Once the local side sends a CONNECTION_CLOSE frame in response to calling
/// [`close`] the remote endpoint may drop any data it received but is as yet
/// undelivered to the application, including data that was acknowledged as received to
/// the local endpoint.
///
/// [`close`]: Connection::close
#[inline]
pub fn close(&self, error_code: VarInt, reason: &[u8]) {
self.inner.close(error_code, reason)
}
/// Transmits `data` as an unreliable, unordered application datagram.
///
/// Application datagrams are a low-level primitive. They may be lost or delivered out
/// of order, and `data` must both fit inside a single QUIC packet and be smaller than
/// the maximum dictated by the peer.
#[inline]
pub fn send_datagram(&self, data: bytes::Bytes) -> Result<(), SendDatagramError> {
self.inner.send_datagram(data)
}
/// Transmits `data` as an unreliable, unordered application datagram
///
/// Unlike [`send_datagram()`], this method will wait for buffer space during congestion
/// conditions, which effectively prioritizes old datagrams over new datagrams.
///
/// See [`send_datagram()`] for details.
///
/// [`send_datagram()`]: Connection::send_datagram
#[inline]
pub fn send_datagram_wait(&self, data: bytes::Bytes) -> SendDatagram<'_> {
self.inner.send_datagram_wait(data)
}
/// Computes the maximum size of datagrams that may be passed to [`send_datagram`].
///
/// Returns `None` if datagrams are unsupported by the peer or disabled locally.
///
/// This may change over the lifetime of a connection according to variation in the path
/// MTU estimate. The peer can also enforce an arbitrarily small fixed limit, but if the
/// peer's limit is large this is guaranteed to be a little over a kilobyte at minimum.
///
/// Not necessarily the maximum size of received datagrams.
///
/// [`send_datagram`]: Self::send_datagram
#[inline]
pub fn max_datagram_size(&self) -> Option<usize> {
self.inner.max_datagram_size()
}
/// Bytes available in the outgoing datagram buffer.
///
/// When greater than zero, calling [`send_datagram`] with a
/// datagram of at most this size is guaranteed not to cause older datagrams to be
/// dropped.
///
/// [`send_datagram`]: Self::send_datagram
#[inline]
pub fn datagram_send_buffer_space(&self) -> usize {
self.inner.datagram_send_buffer_space()
}
/// Current best estimate of this connection's latency (round-trip-time).
#[inline]
pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
self.inner.rtt(path_id)
}
/// Returns connection statistics.
#[inline]
pub fn stats(&self) -> ConnectionStats {
self.inner.stats()
}
/// Current state of the congestion control algorithm, for debugging purposes.
#[inline]
pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
self.inner.congestion_state(path_id)
}
/// Parameters negotiated during the handshake.
///
/// Guaranteed to return `Some` on fully established connections or after
/// [`Connecting::handshake_data()`] succeeds. See that method's documentations for
/// details on the returned value.
///
/// [`Connection::handshake_data()`]: crate::endpoint::Connecting::handshake_data
#[inline]
pub fn handshake_data(&self) -> Option<Box<dyn Any>> {
self.inner.handshake_data()
}
/// Cryptographic identity of the peer.
///
/// The dynamic type returned is determined by the configured [`Session`]. For the
/// default `rustls` session, the return value can be [`downcast`] to a
/// <code>Vec<[rustls::pki_types::CertificateDer]></code>
///
/// [`Session`]: quinn_proto::crypto::Session
/// [`downcast`]: Box::downcast
#[inline]
pub fn peer_identity(&self) -> Option<Box<dyn Any>> {
self.inner.peer_identity()
}
/// A stable identifier for this connection.
///
/// Peer addresses and connection IDs can change, but this value will remain fixed for
/// the lifetime of the connection.
#[inline]
pub fn stable_id(&self) -> usize {
self.inner.stable_id()
}
/// Derives keying material from this connection's TLS session secrets.
///
/// When both peers call this method with the same `label` and `context`
/// arguments and `output` buffers of equal length, they will get the
/// same sequence of bytes in `output`. These bytes are cryptographically
/// strong and pseudorandom, and are suitable for use as keying material.
///
/// See [RFC5705](https://tools.ietf.org/html/rfc5705) for more information.
#[inline]
pub fn export_keying_material(
&self,
output: &mut [u8],
label: &[u8],
context: &[u8],
) -> Result<(), ExportKeyingMaterialError> {
self.inner.export_keying_material(output, label, context)
}
/// Modifies the number of unidirectional streams that may be concurrently opened.
///
/// No streams may be opened by the peer unless fewer than `count` are already
/// open. Large `count`s increase both minimum and worst-case memory consumption.
#[inline]
pub fn set_max_concurrent_uni_streams(&self, count: VarInt) {
self.inner.set_max_concurrent_uni_streams(count)
}
/// See [`quinn_proto::TransportConfig::receive_window`].
#[inline]
pub fn set_receive_window(&self, receive_window: VarInt) {
self.inner.set_receive_window(receive_window)
}
/// Modifies the number of bidirectional streams that may be concurrently opened.
///
/// No streams may be opened by the peer unless fewer than `count` are already
/// open. Large `count`s increase both minimum and worst-case memory consumption.
#[inline]
pub fn set_max_concurrent_bi_streams(&self, count: VarInt) {
self.inner.set_max_concurrent_bi_streams(count)
}
}
impl Connection<HandshakeCompleted> {
/// Extracts the ALPN protocol from the peer's handshake data.
pub fn alpn(&self) -> &[u8] {
&self.data.info.alpn
}
/// Returns the [`EndpointId`] from the peer's TLS certificate.
///
/// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is
/// included in the TLS certificate presented during the handshake when connecting.
/// This function allows you to get the [`EndpointId`] of the remote endpoint of this
/// connection.
///
/// [`PublicKey`]: iroh_base::PublicKey
pub fn remote_id(&self) -> EndpointId {
self.data.info.endpoint_id
}
/// Returns a [`Watcher`] for the network paths of this connection.
///
/// A connection can have several network paths to the remote endpoint, commonly there
/// will be a path via the relay server and a holepunched path.
///
/// Returns a [`PathWatcher`], which implements the [`Watcher`] trait. The watcher is updated
/// whenever a path is opened or closed, or when the path selected for transmission changes
/// (see [`PathInfo::is_selected`]).
///
/// The [`PathInfoList`] returned from the watcher contains a [`PathInfo`] for each
/// network path.
///
/// As long as a [`PathWatcher`] is alive, the list of paths will only grow. If paths
/// are closed, they will be marked as closed (see [`PathInfo::is_closed`]) but will
/// not be removed from the list of paths. This allows to reliably retrieve stats for
/// closed paths.
///
/// A [`PathWatcher`] does not keep the [`Connection`] itself alive. If all references to
/// a connection are dropped, the [`PathWatcher`] will start to return an error when
/// updating. Its last value may still be used - note however that accessing
/// stats for a path via [`PathInfo::stats`] returns `None` if all references to a
/// [`Connection`] have been dropped. To reliably access path stats when a connection closes,
/// wait for [`Connection::closed`] and then call [`Connection::paths`] and directly
/// iterate over the path stats while the [`Connection`] struct is still in scope.
///
/// [`PathInfoList`]: crate::endpoint::PathInfoList
/// [`Watcher`]: crate::Watcher
pub fn paths(&self) -> PathWatcher {
self.data.paths.watch()
}
/// Returns the side of the connection (client or server).
pub fn side(&self) -> Side {
self.inner.side()
}
/// Returns a connection info struct.
///
/// A [`ConnectionInfo`] is a weak handle to the connection that does not keep the connection alive,
/// but does allow to access some information about the connection and to wait for the connection to be closed.
pub fn to_info(&self) -> ConnectionInfo {
ConnectionInfo {
data: self.data.clone(),
inner: self.inner.weak_handle(),
side: self.side(),
}
}
}
impl Connection<IncomingZeroRtt> {
/// Extracts the ALPN protocol from the peer's handshake data.
pub fn alpn(&self) -> Option<Vec<u8>> {
alpn_from_quinn_conn(&self.inner)
}
/// Waits until the full handshake occurs and then returns a [`Connection`].
///
/// This may fail with [`ConnectingError::ConnectionError`], if there was
/// some general failure with the connection, such as a network timeout since
/// we accepted the connection.
///
/// This may fail with [`ConnectingError::HandshakeFailure`], if the other side
/// doesn't use the right TLS authentication, which usually every iroh endpoint
/// uses and requires.
///
/// Thus, those errors should only occur if someone connects to you with a
/// modified iroh endpoint or with a plain QUIC client.
pub async fn handshake_completed(&self) -> Result<Connection, ConnectingError> {
self.data.accepted.clone().await
}
/// Returns the [`EndpointId`] from the peer's TLS certificate.
///
/// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is
/// included in the TLS certificate presented during the handshake when connecting.
/// This function allows you to get the [`EndpointId`] of the remote endpoint of this
/// connection.
///
/// [`PublicKey`]: iroh_base::PublicKey
pub fn remote_id(&self) -> Result<EndpointId, RemoteEndpointIdError> {
remote_id_from_quinn_conn(&self.inner)
}
}
impl Connection<OutgoingZeroRtt> {
/// Extracts the ALPN protocol from the peer's handshake data.
pub fn alpn(&self) -> Option<Vec<u8>> {
alpn_from_quinn_conn(&self.inner)
}
/// Waits until the full handshake occurs and returns a [`ZeroRttStatus`].
///
/// If `ZeroRttStatus::Accepted` is returned, than any streams created before
/// the handshake has completed can still be used.
///
/// If `ZeroRttStatus::Rejected` is returned, than any streams created before
/// the handshake will error and any data sent should be re-sent on a
/// new stream.
///
/// This may fail with [`ConnectingError::ConnectionError`], if there was
/// some general failure with the connection, such as a network timeout since
/// we initiated the connection.
///
/// This may fail with [`ConnectingError::HandshakeFailure`], if the other side
/// doesn't use the right TLS authentication, which usually every iroh endpoint
/// uses and requires.
///
/// Thus, those errors should only occur if someone connects to you with a
/// modified iroh endpoint or with a plain QUIC client.
pub async fn handshake_completed(&self) -> Result<ZeroRttStatus, ConnectingError> {
self.data.accepted.clone().await
}
/// Returns the [`EndpointId`] from the peer's TLS certificate.
///
/// The [`PublicKey`] of an endpoint is also known as an [`EndpointId`]. This [`PublicKey`] is
/// included in the TLS certificate presented during the handshake when connecting.
/// This function allows you to get the [`EndpointId`] of the remote endpoint of this
/// connection.
///
/// [`PublicKey`]: iroh_base::PublicKey
pub fn remote_id(&self) -> Result<EndpointId, RemoteEndpointIdError> {
remote_id_from_quinn_conn(&self.inner)
}
}
/// Information about a connection.
///
/// A [`ConnectionInfo`] is a weak handle to a connection that exposes some information about the connection,
/// but does not keep the connection alive.
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
side: Side,
data: HandshakeCompletedData,
inner: WeakConnectionHandle,
}
#[allow(missing_docs)]
impl ConnectionInfo {
pub fn alpn(&self) -> &[u8] {
&self.data.info.alpn
}
pub fn remote_id(&self) -> EndpointId {
self.data.info.endpoint_id
}
pub fn is_alive(&self) -> bool {
self.inner.upgrade().is_some()
}
/// Returns a watcher for the network paths of this connection.
///
/// See [`Connection::paths`] for details.
pub fn paths(&self) -> PathWatcher {
self.data.paths.watch()
}
/// Returns connection statistics.
///
/// Returns `None` if the connection has been dropped.
pub fn stats(&self) -> Option<ConnectionStats> {
self.inner.upgrade().map(|conn| conn.stats())
}
/// Returns the side of the connection (client or server).
pub fn side(&self) -> Side {
self.side
}
/// Waits for the connection to be closed, and returns the close reason and final connection stats.
///
/// Returns `None` if the connection has been dropped already before this call.
pub async fn closed(&self) -> Option<(ConnectionError, ConnectionStats)> {
let fut = self.inner.upgrade()?.on_closed();
Some(fut.await)
}
/// Returns the currently selected path.
pub fn selected_path(&self) -> Option<PathInfo> {
self.paths().into_iter().find(|path| path.is_selected())
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use iroh_base::{EndpointAddr, SecretKey};
use n0_error::{Result, StackResultExt, StdResultExt};
use n0_future::StreamExt;
use n0_tracing_test::traced_test;
use n0_watcher::Watcher;
use rand::SeedableRng;
use tracing::{Instrument, error_span, info, info_span, trace_span};
use super::Endpoint;
use crate::{
RelayMode,
endpoint::{ConnectOptions, Incoming, PathInfo, PathInfoList, ZeroRttStatus},
test_utils::run_relay_server,
};
const TEST_ALPN: &[u8] = b"n0/iroh/test";
async fn spawn_0rtt_server(secret_key: SecretKey, log_span: tracing::Span) -> Result<Endpoint> {
let server = Endpoint::empty_builder(RelayMode::Disabled)
.secret_key(secret_key)
.alpns(vec![TEST_ALPN.to_vec()])
.bind()
.instrument(log_span.clone())
.await?;
async fn handle_incoming(incoming: Incoming) -> Result {
let accepting = incoming
.accept()
.std_context("Failed to accept incoming connection")?;
// accept a possible 0-RTT connection
let zrtt_conn = accepting.into_0rtt();
let (mut send, mut recv) = zrtt_conn
.accept_bi()
.await
.std_context("failed to accept bi stream")?;
let data = recv
.read_to_end(10_000_000)
.await
.std_context("Failed to read data")?;
send.write_all(&data)
.await
.std_context("Failed to write data")?;
send.finish().std_context("Failed to finish send")?;
// Stay alive until the other side closes the connection.
zrtt_conn.closed().await;
Ok(())
}
// Gets aborted via the endpoint closing causing an `Err`
// a simple echo server
tokio::spawn({
let server = server.clone();
async move {
tracing::trace!("Server accept loop started");
while let Some(incoming) = server.accept().await {
tracing::trace!("Server received incoming connection");
// Handle connection errors gracefully instead of exiting the task
if let Err(e) = handle_incoming(incoming).await {
tracing::warn!("Failure while handling connection: {e:#}");
}
tracing::trace!("Connection closed, ready for next");
}
tracing::trace!("Server accept loop exiting");
n0_error::Ok(())
}
.instrument(log_span)
});
Ok(server)
}
async fn connect_client_0rtt_expect_err(
client: &Endpoint,
server_addr: EndpointAddr,
) -> Result {
let conn = client
.connect_with_opts(server_addr, TEST_ALPN, ConnectOptions::new())
.await?
.into_0rtt()
.expect_err("expected 0-RTT to fail")
.await?;
let (mut send, mut recv) = conn.open_bi().await.anyerr()?;
send.write_all(b"hello").await.anyerr()?;
send.finish().anyerr()?;
let received = recv.read_to_end(1_000).await.anyerr()?;
assert_eq!(&received, b"hello");
conn.close(0u32.into(), b"thx");
Ok(())
}
async fn connect_client_0rtt_expect_ok(
client: &Endpoint,
server_addr: EndpointAddr,
expect_server_accepts: bool,
) -> Result {
tracing::trace!(?server_addr, "Client connecting with 0-RTT");
let zrtt_conn = client
.connect_with_opts(server_addr, TEST_ALPN, ConnectOptions::new())
.await
.context("connect")?
.into_0rtt()
.ok()
.context("into_0rtt")?;
tracing::trace!("Client established 0-RTT connection");
// This is how we send data in 0-RTT:
let (mut send, mut recv) = zrtt_conn.open_bi().await.anyerr()?;
send.write_all(b"hello").await.anyerr()?;
send.finish().anyerr()?;
tracing::trace!("Client sent 0-RTT data, waiting for server response");
// When this resolves, we've gotten a response from the server about whether the 0-RTT data above was accepted:
let zrtt_res = zrtt_conn.handshake_completed().await;
tracing::trace!(?zrtt_res, "Server responded to 0-RTT");
let zrtt_res = zrtt_res.context("handshake completed")?;
let conn = match zrtt_res {
ZeroRttStatus::Accepted(conn) => {
assert!(expect_server_accepts);
conn
}
ZeroRttStatus::Rejected(conn) => {
assert!(!expect_server_accepts);
// in this case we need to re-send data by re-creating the stream.
let (mut send, r) = conn.open_bi().await.anyerr()?;
send.write_all(b"hello").await.anyerr()?;
send.finish().anyerr()?;
recv = r;
conn
}
};
let received = recv.read_to_end(1_000).await.anyerr()?;
assert_eq!(&received, b"hello");
conn.close(0u32.into(), b"thx");
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_0rtt() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
let server = spawn_0rtt_server(SecretKey::generate(&mut rng), info_span!("server")).await?;
connect_client_0rtt_expect_err(&client, server.addr()).await?;
// The second 0rtt attempt should work
connect_client_0rtt_expect_ok(&client, server.addr(), true).await?;
client.close().await;
server.close().await;
Ok(())
}
// We have this test, as this would've failed at some point.
// This effectively tests that we correctly categorize the TLS session tickets we
// receive into the respective "bucket" for the recipient.
#[tokio::test]
#[traced_test]
async fn test_0rtt_non_consecutive() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
let client = Endpoint::empty_builder(RelayMode::Disabled).bind().await?;
let server = spawn_0rtt_server(SecretKey::generate(&mut rng), info_span!("server")).await?;
connect_client_0rtt_expect_err(&client, server.addr()).await?;
// connecting with another endpoint should not interfere with our
// TLS session ticket cache for the first endpoint:
let another =
spawn_0rtt_server(SecretKey::generate(&mut rng), info_span!("another")).await?;
connect_client_0rtt_expect_err(&client, another.addr()).await?;
another.close().await;
connect_client_0rtt_expect_ok(&client, server.addr(), true).await?;
client.close().await;
server.close().await;
Ok(())
}
// Test whether 0-RTT is possible after a restart:
#[tokio::test]
#[traced_test]
async fn test_0rtt_after_server_restart() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
let client = Endpoint::empty_builder(RelayMode::Disabled)
.bind()
.instrument(info_span!("client"))
.await?;
let server_key = SecretKey::generate(&mut rng);
let server = spawn_0rtt_server(server_key.clone(), info_span!("server-initial")).await?;
connect_client_0rtt_expect_err(&client, server.addr())
.instrument(trace_span!("connect1"))
.await
.context("client connect 1")?;
connect_client_0rtt_expect_ok(&client, server.addr(), true)
.instrument(trace_span!("connect2"))
.await
.context("client connect 2")?;
// adds time to the test, but we need to ensure the server is fully closed before spawning the next one.
server.close().await;
let server = spawn_0rtt_server(server_key, info_span!("server-restart")).await?;
// we expect the client to *believe* it can 0-RTT connect to the server (hence expect_ok),
// but the server will reject the early data because it discarded necessary state
// to decrypt it when restarting.
connect_client_0rtt_expect_ok(&client, server.addr(), false)
.instrument(trace_span!("connect3"))
.await
.context("client connect 3")?;
tokio::join!(client.close(), server.close());
Ok(())
}
#[tokio::test]
#[traced_test]
async fn test_paths_watcher() -> Result {
const ALPN: &[u8] = b"test";
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let (relay_map, _relay_map, _guard) = run_relay_server().await?;
let server = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
.secret_key(SecretKey::generate(&mut rng))
.insecure_skip_relay_cert_verify(true)
.alpns(vec![ALPN.to_vec()])
.bind()
.await?;
let client = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
.secret_key(SecretKey::generate(&mut rng))
.insecure_skip_relay_cert_verify(true)
.bind()
.await?;
server.online().await;
let server_addr = server.addr();
info!("server addr: {server_addr:?}");
let (conn_client, conn_server) = tokio::join!(
async { client.connect(server_addr, ALPN).await.unwrap() },
async { server.accept().await.unwrap().await.unwrap() }
);
info!("connected");
let mut paths_client = conn_client.paths().stream();
let mut paths_server = conn_server.paths().stream();
/// Advances the path stream until at least one IP and one relay paths are available.
///
/// Panics if the path stream finishes before that happens.
async fn wait_for_paths(
stream: &mut n0_watcher::Stream<impl n0_watcher::Watcher<Value = PathInfoList> + Unpin>,
) {
loop {
let paths = stream.next().await.expect("paths stream ended");
info!(?paths, "paths");
if paths.len() >= 2
&& paths.iter().any(PathInfo::is_relay)
&& paths.iter().any(PathInfo::is_ip)
{
info!("break");
return;
}
}
}
// Verify that both connections are notified of path changes and get an IP and a relay path.
tokio::join!(
async {
tokio::time::timeout(Duration::from_secs(1), wait_for_paths(&mut paths_server))
.instrument(error_span!("paths-server"))
.await
.unwrap()
},
async {
tokio::time::timeout(Duration::from_secs(1), wait_for_paths(&mut paths_client))
.instrument(error_span!("paths-client"))
.await
.unwrap()
}
);
tokio::time::pause();
// Close the client connection.
info!("close client conn");
conn_client.close(0u32.into(), b"");
// Verify that the path watch streams close shortly after the connection is closed
tokio::time::timeout(Duration::from_nanos(1), async {
while paths_client.next().await.is_some() {}
})
.await
.expect("client paths watcher did not close within 1s of connection close");
tokio::time::timeout(Duration::from_nanos(1), async {
while paths_client.next().await.is_some() {}
})
.await
.expect("server paths watcher did not close within 1s of connection close");
server.close().await;
client.close().await;
Ok(())
}
}