radio/iroh-willow/src/engine.rs

//! Engine for driving a willow store and synchronisation sessions.

use std::sync::{Arc, OnceLock};

use anyhow::Result;
use futures_util::{
    future::{MapErr, Shared},
    FutureExt, TryFutureExt,
};
use iroh::{endpoint::Connection, protocol::{AcceptError, ProtocolHandler}, Endpoint, EndpointId};
use tokio::{
    sync::{mpsc, oneshot},
    task::JoinError,
};
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, error_span, Instrument};

use crate::{
    rpc::{client::MemClient, handler::RpcHandler},
    session::{
        intents::{Intent, IntentHandle},
        SessionInit,
    },
    store::traits::Storage,
};

mod actor;
mod peer_manager;

use self::peer_manager::PeerManager;
pub use self::{actor::ActorHandle, peer_manager::AcceptOpts};

const PEER_MANAGER_INBOX_CAP: usize = 128;

/// The [`Engine`] is the main handle onto a Willow store with networking.
///
/// It runs a dedicated thread for all storage operations, and a peer manager to coordinate network
/// connections to other peers.
///
/// The engine does not establish any peer connections on its own. Synchronisation sessions can be
/// started with [`Engine::sync_with_peer`].
#[derive(Debug, Clone)]
pub struct Engine {
    actor_handle: ActorHandle,
    pub(crate) endpoint: Endpoint,
    peer_manager_inbox: mpsc::Sender<peer_manager::Input>,
    // `Engine` needs to be `Clone + Send`, and we need to `task.await` in its `shutdown()` impl.
    // So we need
    // - `Shared` so we can `task.await` from all `Node` clones
    // - `MapErr` to map the `JoinError` to a `String`, because `JoinError` is `!Clone`
    // - `AbortOnDropHandle` to make sure that the `task` is cancelled when all `Node`s are dropped
    //   (`Shared` acts like an `Arc` around its inner future).
    peer_manager_task: Shared<MapErr<AbortOnDropHandle<Result<(), String>>, JoinErrToStr>>,
    rpc_handler: Arc<OnceLock<crate::rpc::handler::RpcHandler>>,
}

pub(crate) type JoinErrToStr = Box<dyn Fn(JoinError) -> String + Send + Sync + 'static>;

impl Engine {
    /// Get an in memory client to interact with the willow engine.
    pub fn client(&self) -> &MemClient {
        &self
            .rpc_handler
            .get_or_init(|| RpcHandler::new(self.clone()))
            .client
    }

    /// Start the Willow engine.
    ///
    /// This needs an `endpoint` to connect to other peers, and a `create_store` closure which
    /// returns a [`Storage`] instance.
    ///
    /// You also need to pass [`AcceptOpts`] to configure what to do with incoming connections.
    /// Its default implementation will accept all connections and run sync with all our interests.
    ///
    /// To actually accept connections, an [`Endpoint::accept`] loop has to be run outside of the
    /// engine, passing all connections that match [`crate::net::ALPN`] to the engine with
    /// [`Engine::handle_connection`].
    ///
    /// The engine will spawn a dedicated storage thread, and the `create_store` closure will be called on
    /// this thread, so that the [`Storage`] does not have to be `Send`.
    pub fn spawn<S: Storage>(
        endpoint: Endpoint,
        create_store: impl 'static + Send + FnOnce() -> S,
        accept_opts: AcceptOpts,
    ) -> Self {
        let me = endpoint.id();
        let actor_handle = ActorHandle::spawn(create_store, me);
        let (pm_inbox_tx, pm_inbox_rx) = mpsc::channel(PEER_MANAGER_INBOX_CAP);
        let peer_manager = PeerManager::new(
            actor_handle.clone(),
            endpoint.clone(),
            pm_inbox_rx,
            accept_opts,
        );
        let peer_manager_task = tokio::task::spawn(
            async move { peer_manager.run().await.map_err(|e| e.to_string()) }
                .instrument(error_span!("peer_manager", me=%me.fmt_short())),
        );
        let peer_manager_task = AbortOnDropHandle::new(peer_manager_task)
            .map_err(Box::new(|e: JoinError| e.to_string()) as JoinErrToStr)
            .shared();
        Engine {
            actor_handle,
            endpoint,
            peer_manager_inbox: pm_inbox_tx,
            peer_manager_task,
            rpc_handler: Default::default(),
        }
    }

    /// Handle an incoming connection.
    pub async fn handle_connection(&self, conn: Connection) -> Result<()> {
        self.peer_manager_inbox
            .send(peer_manager::Input::HandleConnection { conn })
            .await?;
        Ok(())
    }

    /// Synchronises with a peer.
    ///
    /// Will try to establish a connection to `peer` if there is none already, and then open a
    /// synchronisation session.
    ///
    /// `init` contains the initialisation options for this synchronisation intent.
    ///
    /// Returns an [`IntentHandle`] which receives events and can submit updates into the session.
    ///
    /// This can freely be called multiple times for the same peer. The engine will merge the
    /// intents and make sure that only a single session is opened per peer.
    pub async fn sync_with_peer(&self, peer: EndpointId, init: SessionInit) -> Result<IntentHandle> {
        let (intent, handle) = Intent::new(init);
        self.peer_manager_inbox
            .send(peer_manager::Input::SubmitIntent { peer, intent })
            .await?;
        Ok(handle)
    }

    /// Shutdown the engine.
    ///
    /// This will try to close all connections gracefully for up to 10 seconds,
    /// and abort them otherwise.
    pub async fn shutdown(&self) -> Result<()> {
        debug!("shutdown engine");
        let (reply, reply_rx) = oneshot::channel();
        self.peer_manager_inbox
            .send(peer_manager::Input::Shutdown { reply })
            .await?;
        reply_rx.await?;
        let res = self.peer_manager_task.clone().await;
        match res {
            Err(err) => error!(?err, "peer manager task panicked"),
            Ok(Err(err)) => error!(?err, "peer manager task failed"),
            Ok(Ok(())) => {}
        };
        debug!("shutdown engine: peer manager terminated");
        self.actor_handle.shutdown().await?;
        debug!("shutdown engine: willow actor terminated");
        Ok(())
    }
}

impl std::ops::Deref for Engine {
    type Target = ActorHandle;

    fn deref(&self) -> &Self::Target {
        &self.actor_handle
    }
}

impl ProtocolHandler for Engine {
    async fn accept(&self, conn: Connection) -> Result<(), AcceptError> {
        self.handle_connection(conn).await.map_err(|e| AcceptError::from(std::io::Error::other(e)))
    }

    async fn shutdown(&self) {
        crate::engine::Engine::shutdown(self).await.ok();
    }
}

Synonyms

radio/iroh-docs/src/engine.rs

Neighbours