radio/iroh/examples/echo-no-router.rs

//! Very basic example showing how to implement a basic echo protocol,
//! without using the `Router` API. (For the router version, check out the echo.rs example.)
//!
//! The echo protocol echos any data sent to it in the first stream.
//!
//! ## Running the Example
//!
//!     cargo run --example echo-no-router --features=examples

use iroh::{Endpoint, EndpointAddr};
use n0_error::{AnyError as Error, Result, StdResultExt};

/// Each protocol is identified by its ALPN string.
///
/// The ALPN, or application-layer protocol negotiation, is exchanged in the connection handshake,
/// and the connection is aborted unless both endpoints pass the same bytestring.
const ALPN: &[u8] = b"iroh-example/echo/0";

#[tokio::main]
async fn main() -> Result<()> {
    let endpoint = start_accept_side().await?;

    // wait for the endpoint to be online
    endpoint.online().await;

    connect_side(endpoint.addr()).await?;

    // This makes sure the endpoint is closed properly and connections close gracefully
    // and will indirectly close the tasks spawned by `start_accept_side`.
    endpoint.close().await;

    Ok(())
}

async fn connect_side(addr: EndpointAddr) -> Result<()> {
    let endpoint = Endpoint::bind().await?;

    // Open a connection to the accepting endpoint
    let conn = endpoint.connect(addr, ALPN).await?;

    // Open a bidirectional QUIC stream
    let (mut send, mut recv) = conn.open_bi().await.anyerr()?;

    // Send some data to be echoed
    send.write_all(b"Hello, world!").await.anyerr()?;

    // Signal the end of data for this particular stream
    send.finish().anyerr()?;

    // Receive the echo, but limit reading up to maximum 1000 bytes
    let response = recv.read_to_end(1000).await.anyerr()?;
    assert_eq!(&response, b"Hello, world!");

    // Explicitly close the whole connection.
    conn.close(0u32.into(), b"bye!");

    // The above call only queues a close message to be sent (see how it's not async!).
    // We need to actually call this to make sure this message is sent out.
    endpoint.close().await;
    // If we don't call this, but continue using the endpoint, then the queued
    // close call will eventually be picked up and sent.
    // But always try to wait for endpoint.close().await to go through before dropping
    // the endpoint to ensure any queued messages are sent through and connections are
    // closed gracefully.

    Ok(())
}

async fn start_accept_side() -> Result<Endpoint> {
    let endpoint = Endpoint::builder()
        // The accept side needs to opt-in to the protocols it accepts,
        // as any connection attempts that can't be found with a matching ALPN
        // will be rejected.
        .alpns(vec![ALPN.to_vec()])
        .bind()
        .await?;

    // spawn a task so that `start_accept_side` returns immediately and we can continue in main().
    tokio::spawn({
        let endpoint = endpoint.clone();
        async move {
            // This task won't leak, because we call `endpoint.close()` in `main()`,
            // which causes `endpoint.accept().await` to return `None`.
            // In a more serious environment, we recommend avoiding `tokio::spawn` and use either a `TaskTracker` or
            // `JoinSet` instead to make sure you're not accidentally leaking tasks.
            while let Some(incoming) = endpoint.accept().await {
                // spawn a task for each incoming connection, so we can serve multiple connections asynchronously
                tokio::spawn(async move {
                    let connection = incoming.await.anyerr()?;

                    // We can get the remote's endpoint id from the connection.
                    let endpoint_id = connection.remote_id();
                    println!("accepted connection from {endpoint_id}");

                    // Our protocol is a simple request-response protocol, so we expect the
                    // connecting peer to open a single bi-directional stream.
                    let (mut send, mut recv) = connection.accept_bi().await.anyerr()?;

                    // Echo any bytes received back directly.
                    // This will keep copying until the sender signals the end of data on the stream.
                    let bytes_sent = tokio::io::copy(&mut recv, &mut send).await.anyerr()?;
                    println!("Copied over {bytes_sent} byte(s)");

                    // By calling `finish` on the send stream we signal that we will not send anything
                    // further, which makes the receive stream on the other end terminate.
                    send.finish().anyerr()?;

                    // Wait until the remote closes the connection, which it does once it
                    // received the response.
                    connection.closed().await;

                    Ok::<_, Error>(())
                });
            }

            Ok::<_, Error>(())
        }
    });

    Ok(endpoint)
}

Neighbours