use std::{collections::HashMap, fmt::Debug, path::PathBuf, sync::Arc, time::Duration};
use iroh_blobs::{
downloader::Downloader, net_protocol::Blobs, provider::EventSender, store::GcConfig,
util::local_pool::LocalPool,
};
use iroh_docs::protocol::Docs;
use iroh_gossip::net::Gossip;
use iroh_node_util::rpc::server::AbstractNode;
use quic_rpc::{transport::flume::FlumeConnector, RpcClient, RpcServer};
use tokio_util::task::AbortOnDropHandle;
use crate::{
BlobProvideEventCallback, CallbackError, Connection, Endpoint, IrohError, NodeAddr, PublicKey,
};
#[derive(Debug, uniffi::Record)]
pub struct CounterStats {
pub value: u32,
pub description: String,
}
#[derive(Debug, Clone, uniffi::Object)]
pub struct DirectAddrInfo(pub(crate) iroh::endpoint::DirectAddrInfo);
#[uniffi::export]
impl DirectAddrInfo {
pub fn addr(&self) -> String {
self.0.addr.to_string()
}
pub fn latency(&self) -> Option<Duration> {
self.0.latency
}
pub fn last_control(&self) -> Option<LatencyAndControlMsg> {
self.0
.last_control
.map(|(latency, control_msg)| LatencyAndControlMsg {
latency,
control_msg: control_msg.to_string(),
})
}
pub fn last_payload(&self) -> Option<Duration> {
self.0.last_payload
}
}
#[derive(Debug, uniffi::Record)]
pub struct LatencyAndControlMsg {
pub latency: Duration,
pub control_msg: String,
}
#[derive(Debug, uniffi::Record)]
pub struct RemoteInfo {
pub node_id: Arc<PublicKey>,
pub relay_url: Option<String>,
pub addrs: Vec<Arc<DirectAddrInfo>>,
pub conn_type: Arc<ConnectionType>,
pub latency: Option<Duration>,
pub last_used: Option<Duration>,
}
impl From<iroh::endpoint::RemoteInfo> for RemoteInfo {
fn from(value: iroh::endpoint::RemoteInfo) -> Self {
RemoteInfo {
node_id: Arc::new(value.node_id.into()),
relay_url: value.relay_url.map(|info| info.relay_url.to_string()),
addrs: value
.addrs
.iter()
.map(|a| Arc::new(DirectAddrInfo(a.clone())))
.collect(),
conn_type: Arc::new(value.conn_type.into()),
latency: value.latency,
last_used: value.last_used,
}
}
}
#[derive(Debug, uniffi::Enum)]
pub enum ConnType {
Direct,
Relay,
Mixed,
None,
}
#[derive(Debug, uniffi::Object)]
pub enum ConnectionType {
Direct(String),
Relay(String),
Mixed(String, String),
None,
}
#[uniffi::export]
impl ConnectionType {
pub fn r#type(&self) -> ConnType {
match self {
ConnectionType::Direct(_) => ConnType::Direct,
ConnectionType::Relay(_) => ConnType::Relay,
ConnectionType::Mixed(..) => ConnType::Mixed,
ConnectionType::None => ConnType::None,
}
}
pub fn as_direct(&self) -> String {
match self {
ConnectionType::Direct(addr) => addr.clone(),
_ => panic!("ConnectionType type is not 'Direct'"),
}
}
pub fn as_relay(&self) -> String {
match self {
ConnectionType::Relay(url) => url.clone(),
_ => panic!("ConnectionType is not `Relay`"),
}
}
pub fn as_mixed(&self) -> ConnectionTypeMixed {
match self {
ConnectionType::Mixed(addr, url) => ConnectionTypeMixed {
addr: addr.clone(),
relay_url: url.clone(),
},
_ => panic!("ConnectionType is not `Relay`"),
}
}
}
#[derive(Debug, uniffi::Record)]
pub struct ConnectionTypeMixed {
pub addr: String,
pub relay_url: String,
}
impl From<iroh::endpoint::ConnectionType> for ConnectionType {
fn from(value: iroh::endpoint::ConnectionType) -> Self {
match value {
iroh::endpoint::ConnectionType::Direct(addr) => {
ConnectionType::Direct(addr.to_string())
}
iroh::endpoint::ConnectionType::Mixed(addr, url) => {
ConnectionType::Mixed(addr.to_string(), url.to_string())
}
iroh::endpoint::ConnectionType::Relay(url) => ConnectionType::Relay(url.to_string()),
iroh::endpoint::ConnectionType::None => ConnectionType::None,
}
}
}
#[derive(derive_more::Debug, uniffi::Record)]
pub struct NodeOptions {
#[uniffi(default = None)]
pub gc_interval_millis: Option<u64>,
#[debug("BlobProvideEventCallback")]
#[uniffi(default = None)]
pub blob_events: Option<Arc<dyn BlobProvideEventCallback>>,
#[uniffi(default = false)]
pub enable_docs: bool,
#[uniffi(default = None)]
pub ipv4_addr: Option<String>,
#[uniffi(default = None)]
pub ipv6_addr: Option<String>,
#[uniffi(default = None)]
pub node_discovery: Option<NodeDiscoveryConfig>,
#[uniffi(default = None)]
pub secret_key: Option<Vec<u8>>,
#[uniffi(default = None)]
pub protocols: Option<HashMap<Vec<u8>, Arc<dyn ProtocolCreator>>>,
}
#[uniffi::export(with_foreign)]
pub trait ProtocolCreator: std::fmt::Debug + Send + Sync + 'static {
fn create(&self, endpoint: Arc<Endpoint>) -> Arc<dyn ProtocolHandler>;
}
#[uniffi::export(with_foreign)]
#[async_trait::async_trait]
pub trait ProtocolHandler: Send + Sync + 'static {
async fn accept(&self, conn: Arc<Connection>) -> Result<(), CallbackError>;
async fn shutdown(&self);
}
#[derive(derive_more::Debug, Clone)]
struct ProtocolWrapper {
#[debug("handler")]
handler: Arc<dyn ProtocolHandler>,
}
impl iroh::protocol::ProtocolHandler for ProtocolWrapper {
fn accept(
&self,
conn: iroh::endpoint::Connection,
) -> futures_lite::future::Boxed<anyhow::Result<()>> {
let this = self.clone();
Box::pin(async move {
this.handler.accept(Arc::new(conn.into())).await?;
Ok(())
})
}
fn shutdown(&self) -> futures_lite::future::Boxed<()> {
let this = self.clone();
Box::pin(async move {
this.handler.shutdown().await;
})
}
}
impl Default for NodeOptions {
fn default() -> Self {
NodeOptions {
gc_interval_millis: Some(0),
blob_events: None,
enable_docs: false,
ipv4_addr: None,
ipv6_addr: None,
node_discovery: None,
secret_key: None,
protocols: None,
}
}
}
#[derive(Debug, Default, uniffi::Enum)]
pub enum NodeDiscoveryConfig {
None,
#[default]
Default,
}
#[derive(uniffi::Object, Debug, Clone)]
pub struct Iroh {
router: iroh::protocol::Router,
_local_pool: Arc<LocalPool>,
pub(crate) client: RpcClient<
iroh_node_util::rpc::proto::RpcService,
FlumeConnector<iroh_node_util::rpc::proto::Response, iroh_node_util::rpc::proto::Request>,
>,
_handler: Arc<AbortOnDropHandle<()>>,
pub(crate) blobs_client: BlobsClient,
pub(crate) tags_client: TagsClient,
pub(crate) net_client: NetClient,
pub(crate) authors_client: Option<AuthorsClient>,
pub(crate) docs_client: Option<DocsClient>,
pub(crate) gossip: Gossip,
}
pub(crate) type NetClient = iroh_node_util::rpc::client::net::Client;
pub(crate) type BlobsClient = iroh_blobs::rpc::client::blobs::Client<
FlumeConnector<iroh_blobs::rpc::proto::Response, iroh_blobs::rpc::proto::Request>,
>;
pub(crate) type TagsClient = iroh_blobs::rpc::client::tags::Client<
FlumeConnector<iroh_blobs::rpc::proto::Response, iroh_blobs::rpc::proto::Request>,
>;
pub(crate) type AuthorsClient = iroh_docs::rpc::client::authors::Client<
FlumeConnector<iroh_docs::rpc::proto::Response, iroh_docs::rpc::proto::Request>,
>;
pub(crate) type DocsClient = iroh_docs::rpc::client::docs::Client<
FlumeConnector<iroh_docs::rpc::proto::Response, iroh_docs::rpc::proto::Request>,
>;
#[derive(Debug, Clone)]
struct NetNode(iroh::Endpoint);
impl AbstractNode for NetNode {
fn endpoint(&self) -> &iroh::Endpoint {
&self.0
}
fn shutdown(&self) {}
}
#[uniffi::export]
impl Iroh {
#[uniffi::constructor(async_runtime = "tokio")]
pub async fn persistent(path: String) -> Result<Self, IrohError> {
let options = NodeOptions::default();
Self::persistent_with_options(path, options).await
}
#[uniffi::constructor(async_runtime = "tokio")]
pub async fn memory() -> Result<Self, IrohError> {
let options = NodeOptions::default();
Self::memory_with_options(options).await
}
#[uniffi::constructor(async_runtime = "tokio")]
pub async fn persistent_with_options(
path: String,
options: NodeOptions,
) -> Result<Self, IrohError> {
let path = PathBuf::from(path);
tokio::fs::create_dir_all(&path)
.await
.map_err(|err| anyhow::anyhow!(err))?;
let builder = iroh::Endpoint::builder();
let (docs_store, author_store) = if options.enable_docs {
let docs_store = iroh_docs::store::Store::persistent(path.join("docs.redb"))?;
let author_store =
iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author"));
(Some(docs_store), Some(author_store))
} else {
(None, None)
};
let blobs_store = iroh_blobs::store::fs::Store::load(path.join("blobs"))
.await
.map_err(|err| anyhow::anyhow!(err))?;
let local_pool = LocalPool::default();
let (builder, gossip, blobs, docs) = apply_options(
builder,
options,
blobs_store,
docs_store,
author_store,
&local_pool,
)
.await?;
let router = builder.spawn();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = RpcClient::new(connector);
let nn = Arc::new(NetNode(router.endpoint().clone()));
let handler = listener.spawn_accept_loop(move |req, chan| {
iroh_node_util::rpc::server::handle_rpc_request(nn.clone(), req, chan)
});
let blobs_client = blobs.client().clone();
let net_client = iroh_node_util::rpc::client::net::Client::new(client.clone().boxed());
let docs_client = docs.map(|d| d.client().clone());
Ok(Iroh {
router,
_local_pool: Arc::new(local_pool),
client,
_handler: Arc::new(handler),
tags_client: blobs_client.tags(),
blobs_client,
net_client,
authors_client: docs_client.as_ref().map(|d| d.authors()),
docs_client,
gossip,
})
}
#[uniffi::constructor(async_runtime = "tokio")]
pub async fn memory_with_options(options: NodeOptions) -> Result<Self, IrohError> {
let builder = iroh::Endpoint::builder();
let (docs_store, author_store) = if options.enable_docs {
let docs_store = iroh_docs::store::Store::memory();
let author_store = iroh_docs::engine::DefaultAuthorStorage::Mem;
(Some(docs_store), Some(author_store))
} else {
(None, None)
};
let blobs_store = iroh_blobs::store::mem::Store::default();
let local_pool = LocalPool::default();
let (builder, gossip, blobs, docs) = apply_options(
builder,
options,
blobs_store,
docs_store,
author_store,
&local_pool,
)
.await?;
let router = builder.spawn();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = RpcClient::new(connector);
let nn: Arc<dyn AbstractNode> = Arc::new(NetNode(router.endpoint().clone()));
let handler = listener.spawn_accept_loop(move |req, chan| {
iroh_node_util::rpc::server::handle_rpc_request(nn.clone(), req, chan)
});
let blobs_client = blobs.client().clone();
let net_client = iroh_node_util::rpc::client::net::Client::new(client.clone().boxed());
let docs_client = docs.map(|d| d.client().clone());
Ok(Iroh {
router,
_local_pool: Arc::new(local_pool),
client,
_handler: Arc::new(handler),
net_client,
tags_client: blobs_client.tags(),
blobs_client,
authors_client: docs_client.as_ref().map(|d| d.authors()),
docs_client,
gossip,
})
}
pub fn node(&self) -> Node {
let router = self.router.clone();
let client = self.client.clone().boxed();
let client = iroh_node_util::rpc::client::node::Client::new(client);
Node { router, client }
}
}
async fn apply_options<S: iroh_blobs::store::Store>(
mut builder: iroh::endpoint::Builder,
options: NodeOptions,
blob_store: S,
docs_store: Option<iroh_docs::store::Store>,
author_store: Option<iroh_docs::engine::DefaultAuthorStorage>,
local_pool: &LocalPool,
) -> anyhow::Result<(
iroh::protocol::RouterBuilder,
Gossip,
Blobs<S>,
Option<Docs<S>>,
)> {
let gc_period = if let Some(millis) = options.gc_interval_millis {
match millis {
0 => None,
millis => Some(Duration::from_millis(millis)),
}
} else {
None
};
let blob_events = if let Some(blob_events_cb) = options.blob_events {
BlobProvideEvents::new(blob_events_cb).into()
} else {
EventSender::default()
};
if let Some(addr) = options.ipv4_addr {
builder = builder.bind_addr_v4(addr.parse()?);
}
if let Some(addr) = options.ipv6_addr {
builder = builder.bind_addr_v6(addr.parse()?);
}
builder = match options.node_discovery {
Some(NodeDiscoveryConfig::None) => builder.clear_discovery(),
Some(NodeDiscoveryConfig::Default) | None => builder.discovery_n0(),
};
if let Some(secret_key) = options.secret_key {
let key: [u8; 32] = AsRef::<[u8]>::as_ref(&secret_key).try_into()?;
let key = iroh::SecretKey::from_bytes(&key);
builder = builder.secret_key(key);
}
let endpoint = builder.bind().await?;
let mut builder = iroh::protocol::Router::builder(endpoint);
let endpoint = Arc::new(Endpoint::new(builder.endpoint().clone()));
let gossip = Gossip::builder().spawn(builder.endpoint().clone()).await?;
builder = builder.accept(iroh_gossip::ALPN, gossip.clone());
let downloader = Downloader::new(
blob_store.clone(),
builder.endpoint().clone(),
local_pool.handle().clone(),
);
let blobs = Blobs::builder(blob_store.clone())
.events(blob_events)
.build(builder.endpoint());
builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let docs = if options.enable_docs {
let engine = iroh_docs::engine::Engine::spawn(
builder.endpoint().clone(),
gossip.clone(),
docs_store.expect("docs enabled"),
blob_store.clone(),
downloader,
author_store.expect("docs enabled"),
local_pool.handle().clone(),
)
.await?;
let docs = Docs::new(engine);
builder = builder.accept(iroh_docs::ALPN, docs.clone());
blobs.add_protected(docs.protect_cb())?;
Some(docs)
} else {
None
};
if let Some(period) = gc_period {
blobs.start_gc(GcConfig {
period,
done_callback: None,
})?;
}
if let Some(protocols) = options.protocols {
for (alpn, protocol) in protocols {
let handler = protocol.create(endpoint.clone());
builder = builder.accept(alpn, ProtocolWrapper { handler });
}
}
Ok((builder, gossip, blobs, docs))
}
#[derive(uniffi::Object)]
pub struct Node {
router: iroh::protocol::Router,
client: iroh_node_util::rpc::client::node::Client,
}
#[uniffi::export]
impl Node {
#[uniffi::method(async_runtime = "tokio")]
pub async fn stats(&self) -> Result<HashMap<String, CounterStats>, IrohError> {
let stats = self.client.stats().await?;
let stats = stats
.into_iter()
.map(|(k, v)| {
(
k,
CounterStats {
value: u32::try_from(v.value).expect("value too large"),
description: v.description,
},
)
})
.collect();
Ok(stats)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn status(&self) -> Result<Arc<NodeStatus>, IrohError> {
let res = self.client.status().await.map(|n| Arc::new(n.into()))?;
Ok(res)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn shutdown(&self) -> Result<(), IrohError> {
self.router.shutdown().await?;
Ok(())
}
#[uniffi::method]
pub fn endpoint(&self) -> Endpoint {
Endpoint::new(self.router.endpoint().clone())
}
}
#[derive(Debug, uniffi::Object)]
pub struct NodeStatus(iroh_node_util::rpc::client::net::NodeStatus);
impl From<iroh_node_util::rpc::client::net::NodeStatus> for NodeStatus {
fn from(n: iroh_node_util::rpc::client::net::NodeStatus) -> Self {
NodeStatus(n)
}
}
#[uniffi::export]
impl NodeStatus {
pub fn node_addr(&self) -> Arc<NodeAddr> {
Arc::new(self.0.addr.clone().into())
}
pub fn listen_addrs(&self) -> Vec<String> {
self.0
.listen_addrs
.iter()
.map(|addr| addr.to_string())
.collect()
}
pub fn version(&self) -> String {
self.0.version.clone()
}
pub fn rpc_addr(&self) -> Option<String> {
self.0.rpc_addr.map(|a| a.to_string())
}
}
#[derive(Clone)]
struct BlobProvideEvents {
callback: Arc<dyn BlobProvideEventCallback>,
}
impl Debug for BlobProvideEvents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlobProvideEvents()")
}
}
impl BlobProvideEvents {
fn new(callback: Arc<dyn BlobProvideEventCallback>) -> Self {
Self { callback }
}
}
impl iroh_blobs::provider::CustomEventSender for BlobProvideEvents {
fn send(&self, event: iroh_blobs::provider::Event) -> futures_lite::future::Boxed<()> {
let cb = self.callback.clone();
Box::pin(async move {
cb.blob_event(Arc::new(event.into())).await.ok();
})
}
fn try_send(&self, event: iroh_blobs::provider::Event) {
let cb = self.callback.clone();
tokio::task::spawn(async move {
cb.blob_event(Arc::new(event.into())).await.ok();
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_memory() {
let node = Iroh::memory().await.unwrap();
let id = node.net().node_id().await.unwrap();
println!("{id}");
}
}