use std::{
collections::HashMap, future::Future, path::PathBuf, pin::Pin, sync::Arc, time::Duration,
};
use iroh_blobs::{
downloader::Downloader, net_protocol::Blobs, provider::EventSender, store::GcConfig,
util::local_pool::LocalPool,
};
use iroh_docs::protocol::Docs;
use iroh_gossip::net::Gossip;
use iroh_node_util::rpc::server::AbstractNode;
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
use quic_rpc::{transport::flume::FlumeConnector, RpcClient, RpcServer};
use tokio_util::task::AbortOnDropHandle;
use tracing::warn;
use crate::{BlobProvideEvent, Connection, CounterStats, Endpoint, NodeAddr};
#[napi(object, object_to_js = false)]
pub struct NodeOptions {
pub gc_interval_millis: Option<u32>,
pub blob_events: Option<ThreadsafeFunction<BlobProvideEvent, ()>>,
pub enable_docs: Option<bool>,
pub ipv4_addr: Option<String>,
pub ipv6_addr: Option<String>,
pub node_discovery: Option<NodeDiscoveryConfig>,
pub secret_key: Option<Vec<u8>>,
pub protocols: Option<HashMap<Vec<u8>, ThreadsafeFunction<Endpoint, ProtocolHandler>>>,
}
#[derive(derive_more::Debug)]
#[napi(object, object_to_js = false)]
pub struct ProtocolHandler {
#[debug("accept")]
pub accept: Arc<ThreadsafeFunction<Connection, ()>>,
#[debug("shutdown")]
pub shutdown: Option<Arc<ThreadsafeFunction<(), ()>>>,
}
impl iroh::protocol::ProtocolHandler for ProtocolHandler {
fn accept(
&self,
conn: iroh::endpoint::Connection,
) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
let accept = self.accept.clone();
Box::pin(async move {
accept.call_async(Ok(conn.into())).await?;
Ok(())
})
}
fn shutdown(&self) -> Pin<Box<dyn Future<Output = ()> + Send>> {
let shutdown = self.shutdown.clone();
Box::pin(async move {
if let Some(ref cb) = shutdown {
if let Err(err) = cb.call_async(Ok(())).await {
warn!("shutdown failed: {:?}", err);
}
}
})
}
}
impl Default for NodeOptions {
fn default() -> Self {
NodeOptions {
gc_interval_millis: None,
blob_events: None,
enable_docs: None,
ipv4_addr: None,
ipv6_addr: None,
node_discovery: None,
secret_key: None,
protocols: None,
}
}
}
#[derive(Debug, Default)]
#[napi(string_enum)]
pub enum NodeDiscoveryConfig {
None,
#[default]
Default,
}
#[derive(Debug, Clone)]
#[napi]
pub struct Iroh {
pub(crate) router: iroh::protocol::Router,
_local_pool: Arc<LocalPool>,
pub(crate) client: RpcClient<
iroh_node_util::rpc::proto::RpcService,
FlumeConnector<iroh_node_util::rpc::proto::Response, iroh_node_util::rpc::proto::Request>,
>,
_handler: Arc<AbortOnDropHandle<()>>,
pub(crate) blobs_client: BlobsClient,
pub(crate) tags_client: TagsClient,
pub(crate) net_client: NetClient,
pub(crate) authors_client: Option<AuthorsClient>,
pub(crate) docs_client: Option<DocsClient>,
pub(crate) gossip: Gossip,
}
pub(crate) type NetClient = iroh_node_util::rpc::client::net::Client;
pub(crate) type BlobsClient = iroh_blobs::rpc::client::blobs::Client<
FlumeConnector<iroh_blobs::rpc::proto::Response, iroh_blobs::rpc::proto::Request>,
>;
pub(crate) type TagsClient = iroh_blobs::rpc::client::tags::Client<
FlumeConnector<iroh_blobs::rpc::proto::Response, iroh_blobs::rpc::proto::Request>,
>;
pub(crate) type AuthorsClient = iroh_docs::rpc::client::authors::Client<
FlumeConnector<iroh_docs::rpc::proto::Response, iroh_docs::rpc::proto::Request>,
>;
pub(crate) type DocsClient = iroh_docs::rpc::client::docs::Client<
FlumeConnector<iroh_docs::rpc::proto::Response, iroh_docs::rpc::proto::Request>,
>;
#[derive(Debug, Clone)]
struct NetNode(iroh::Endpoint);
impl AbstractNode for NetNode {
fn endpoint(&self) -> &iroh::Endpoint {
&self.0
}
fn shutdown(&self) {}
}
#[napi]
impl Iroh {
#[napi(factory)]
pub async fn persistent(path: String, opts: Option<NodeOptions>) -> Result<Self> {
let options = opts.unwrap_or_default();
let path = PathBuf::from(path);
tokio::fs::create_dir_all(&path)
.await
.map_err(|err| anyhow::anyhow!(err))?;
let builder = iroh::Endpoint::builder();
let (docs_store, author_store) = if options.enable_docs.unwrap_or_default() {
let docs_store = iroh_docs::store::Store::persistent(path.join("docs.redb"))?;
let author_store =
iroh_docs::engine::DefaultAuthorStorage::Persistent(path.join("default-author"));
(Some(docs_store), Some(author_store))
} else {
(None, None)
};
let blobs_store = iroh_blobs::store::fs::Store::load(path.join("blobs"))
.await
.map_err(|err| anyhow::anyhow!(err))?;
let local_pool = LocalPool::default();
let (builder, gossip, blobs, docs) = apply_options(
builder,
options,
blobs_store,
docs_store,
author_store,
&local_pool,
)
.await?;
let router = builder.spawn();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = RpcClient::new(connector);
let nn = Arc::new(NetNode(router.endpoint().clone()));
let handler = listener.spawn_accept_loop(move |req, chan| {
iroh_node_util::rpc::server::handle_rpc_request(nn.clone(), req, chan)
});
let blobs_client = blobs.client().clone();
let net_client = iroh_node_util::rpc::client::net::Client::new(client.clone().boxed());
let docs_client = docs.map(|d| d.client().clone());
Ok(Iroh {
router,
_local_pool: Arc::new(local_pool),
client,
_handler: Arc::new(handler),
tags_client: blobs_client.tags(),
blobs_client,
net_client,
authors_client: docs_client.as_ref().map(|d| d.authors()),
docs_client,
gossip,
})
}
#[napi(factory)]
pub async fn memory(opts: Option<NodeOptions>) -> Result<Self> {
let options = opts.unwrap_or_default();
let builder = iroh::Endpoint::builder();
let (docs_store, author_store) = if options.enable_docs.unwrap_or_default() {
let docs_store = iroh_docs::store::Store::memory();
let author_store = iroh_docs::engine::DefaultAuthorStorage::Mem;
(Some(docs_store), Some(author_store))
} else {
(None, None)
};
let blobs_store = iroh_blobs::store::mem::Store::default();
let local_pool = LocalPool::default();
let (builder, gossip, blobs, docs) = apply_options(
builder,
options,
blobs_store,
docs_store,
author_store,
&local_pool,
)
.await?;
let router = builder.spawn();
let (listener, connector) = quic_rpc::transport::flume::channel(1);
let listener = RpcServer::new(listener);
let client = RpcClient::new(connector);
let nn: Arc<dyn AbstractNode> = Arc::new(NetNode(router.endpoint().clone()));
let handler = listener.spawn_accept_loop(move |req, chan| {
iroh_node_util::rpc::server::handle_rpc_request(nn.clone(), req, chan)
});
let blobs_client = blobs.client().clone();
let net_client = iroh_node_util::rpc::client::net::Client::new(client.clone().boxed());
let docs_client = docs.map(|d| d.client().clone());
Ok(Iroh {
router,
_local_pool: Arc::new(local_pool),
client,
_handler: Arc::new(handler),
net_client,
tags_client: blobs_client.tags(),
blobs_client,
authors_client: docs_client.as_ref().map(|d| d.authors()),
docs_client,
gossip,
})
}
#[napi(getter)]
pub fn node(&self) -> Node {
let router = self.router.clone();
let client = self.client.clone().boxed();
let client = iroh_node_util::rpc::client::node::Client::new(client);
Node { router, client }
}
}
async fn apply_options<S: iroh_blobs::store::Store>(
mut builder: iroh::endpoint::Builder,
options: NodeOptions,
blob_store: S,
docs_store: Option<iroh_docs::store::Store>,
author_store: Option<iroh_docs::engine::DefaultAuthorStorage>,
local_pool: &LocalPool,
) -> anyhow::Result<(
iroh::protocol::RouterBuilder,
Gossip,
Blobs<S>,
Option<Docs<S>>,
)> {
let gc_period = if let Some(millis) = options.gc_interval_millis {
match millis {
0 => None,
millis => Some(Duration::from_millis(millis as _)),
}
} else {
None
};
let blob_events = if let Some(blob_events_cb) = options.blob_events {
BlobProvideEvents::new(blob_events_cb).into()
} else {
EventSender::default()
};
if let Some(addr) = options.ipv4_addr {
builder = builder.bind_addr_v4(addr.parse()?);
}
if let Some(addr) = options.ipv6_addr {
builder = builder.bind_addr_v6(addr.parse()?);
}
builder = match options.node_discovery {
Some(NodeDiscoveryConfig::None) => builder.clear_discovery(),
Some(NodeDiscoveryConfig::Default) | None => builder.discovery_n0(),
};
if let Some(secret_key) = options.secret_key {
let key: [u8; 32] = AsRef::<[u8]>::as_ref(&secret_key).try_into()?;
let key = iroh::SecretKey::from_bytes(&key);
builder = builder.secret_key(key);
}
let endpoint = builder.bind().await?;
let mut builder = iroh::protocol::Router::builder(endpoint);
let endpoint = Endpoint::new(builder.endpoint().clone());
let gossip = iroh_gossip::net::Gossip::builder()
.spawn(builder.endpoint().clone())
.await?;
builder = builder.accept(iroh_gossip::ALPN, gossip.clone());
let downloader = Downloader::new(
blob_store.clone(),
builder.endpoint().clone(),
local_pool.handle().clone(),
);
let blobs = Blobs::builder(blob_store.clone())
.events(blob_events)
.build(builder.endpoint());
builder = builder.accept(iroh_blobs::ALPN, blobs.clone());
let docs = if options.enable_docs.unwrap_or_default() {
let engine = iroh_docs::engine::Engine::spawn(
builder.endpoint().clone(),
gossip.clone(),
docs_store.expect("docs enabled"),
blob_store.clone(),
downloader,
author_store.expect("docs enabled"),
local_pool.handle().clone(),
)
.await?;
let docs = Docs::new(engine);
builder = builder.accept(iroh_docs::ALPN, docs.clone());
blobs.add_protected(docs.protect_cb())?;
Some(docs)
} else {
None
};
if let Some(period) = gc_period {
blobs.start_gc(GcConfig {
period,
done_callback: None,
})?;
}
if let Some(protocols) = options.protocols {
for (alpn, protocol) in protocols {
let handler = protocol.call_async(Ok(endpoint.clone())).await?;
builder = builder.accept(alpn, handler);
}
}
Ok((builder, gossip, blobs, docs))
}
#[napi]
pub struct Node {
router: iroh::protocol::Router,
client: iroh_node_util::rpc::client::node::Client,
}
#[napi]
impl Node {
#[napi]
pub async fn stats(&self) -> Result<HashMap<String, CounterStats>> {
let stats = self.client.stats().await?;
let stats = stats
.into_iter()
.map(|(k, v)| {
(
k,
CounterStats {
value: u32::try_from(v.value).expect("value too large"),
description: v.description,
},
)
})
.collect();
Ok(stats)
}
#[napi]
pub async fn status(&self) -> Result<NodeStatus> {
let res = self.client.status().await.map(|n| n.into())?;
Ok(res)
}
#[napi]
pub async fn shutdown(&self) -> Result<()> {
self.router.shutdown().await?;
Ok(())
}
#[napi]
pub fn endpoint(&self) -> Endpoint {
Endpoint::new(self.router.endpoint().clone())
}
}
#[derive(Debug)]
#[napi(object)]
pub struct NodeStatus {
pub addr: NodeAddr,
pub listen_addrs: Vec<String>,
pub version: String,
pub rpc_addr: Option<String>,
}
impl From<iroh_node_util::rpc::client::net::NodeStatus> for NodeStatus {
fn from(n: iroh_node_util::rpc::client::net::NodeStatus) -> Self {
NodeStatus {
addr: n.addr.into(),
listen_addrs: n.listen_addrs.iter().map(|addr| addr.to_string()).collect(),
version: n.version,
rpc_addr: n.rpc_addr.map(|a| a.to_string()),
}
}
}
#[derive(Clone)]
struct BlobProvideEvents {
callback: Arc<ThreadsafeFunction<BlobProvideEvent, ()>>,
}
impl std::fmt::Debug for BlobProvideEvents {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "BlobProvideEvents()")
}
}
impl BlobProvideEvents {
fn new(callback: ThreadsafeFunction<BlobProvideEvent, ()>) -> Self {
Self {
callback: Arc::new(callback),
}
}
}
impl iroh_blobs::provider::CustomEventSender for BlobProvideEvents {
fn send(&self, event: iroh_blobs::provider::Event) -> futures::future::BoxFuture<'static, ()> {
let cb = self.callback.clone();
Box::pin(async move {
let msg = BlobProvideEvent::convert(event);
if let Err(err) = cb.call_async(msg).await {
eprintln!("failed call: {:?}", err);
}
})
}
fn try_send(&self, event: iroh_blobs::provider::Event) {
let cb = self.callback.clone();
let msg = BlobProvideEvent::convert(event);
cb.call(msg, ThreadsafeFunctionCallMode::NonBlocking);
}
}