use std::{path::PathBuf, str::FromStr, sync::Arc, time::SystemTime};
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use quic_rpc::transport::flume::FlumeConnector;
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::DocsClient;
use crate::{
ticket::AddrInfoOptions, AuthorId, CallbackError, DocTicket, Hash, Iroh, IrohError, PublicKey,
};
#[derive(Debug, uniffi::Enum)]
pub enum CapabilityKind {
Write = 1,
Read = 2,
}
impl From<iroh_docs::CapabilityKind> for CapabilityKind {
fn from(value: iroh_docs::CapabilityKind) -> Self {
match value {
iroh_docs::CapabilityKind::Write => Self::Write,
iroh_docs::CapabilityKind::Read => Self::Read,
}
}
}
#[derive(uniffi::Object)]
pub struct Docs {
client: DocsClient,
}
type MemConnector = FlumeConnector<iroh_docs::rpc::proto::Response, iroh_docs::rpc::proto::Request>;
#[uniffi::export]
impl Iroh {
pub fn docs(&self) -> Docs {
Docs {
client: self.docs_client.clone().expect("missing docs"),
}
}
}
#[uniffi::export]
impl Docs {
#[uniffi::method(async_runtime = "tokio")]
pub async fn create(&self) -> Result<Arc<Doc>, IrohError> {
let doc = self.client.create().await?;
Ok(Arc::new(Doc { inner: doc }))
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn join(&self, ticket: &DocTicket) -> Result<Arc<Doc>, IrohError> {
let doc = self.client.import(ticket.clone().into()).await?;
Ok(Arc::new(Doc { inner: doc }))
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn join_and_subscribe(
&self,
ticket: &DocTicket,
cb: Arc<dyn SubscribeCallback>,
) -> Result<Arc<Doc>, IrohError> {
let (doc, mut stream) = self
.client
.import_and_subscribe(ticket.clone().into())
.await?;
tokio::spawn(async move {
while let Some(event) = stream.next().await {
match event {
Ok(event) => {
if let Err(err) = cb.event(Arc::new(event.into())).await {
warn!("cb error: {:?}", err);
}
}
Err(err) => {
warn!("rpc error: {:?}", err);
}
}
}
});
Ok(Arc::new(Doc { inner: doc }))
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn list(&self) -> Result<Vec<NamespaceAndCapability>, IrohError> {
let docs = self
.client
.list()
.await?
.map_ok(|(namespace, capability)| NamespaceAndCapability {
namespace: namespace.to_string(),
capability: capability.into(),
})
.try_collect::<Vec<_>>()
.await?;
Ok(docs)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn open(&self, id: String) -> Result<Option<Arc<Doc>>, IrohError> {
let namespace_id = iroh_docs::NamespaceId::from_str(&id)?;
let doc = self.client.open(namespace_id).await?;
Ok(doc.map(|d| Arc::new(Doc { inner: d })))
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn drop_doc(&self, doc_id: String) -> Result<(), IrohError> {
let doc_id = iroh_docs::NamespaceId::from_str(&doc_id)?;
self.client.drop_doc(doc_id).await.map_err(IrohError::from)
}
}
#[derive(Debug, uniffi::Record)]
pub struct NamespaceAndCapability {
pub namespace: String,
pub capability: CapabilityKind,
}
#[derive(Clone, uniffi::Object)]
pub struct Doc {
pub(crate) inner: iroh_docs::rpc::client::docs::Doc<MemConnector>,
}
#[uniffi::export]
impl Doc {
#[uniffi::method]
pub fn id(&self) -> String {
self.inner.id().to_string()
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn close_me(&self) -> Result<(), IrohError> {
self.inner.close().await.map_err(IrohError::from)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn set_bytes(
&self,
author_id: &AuthorId,
key: Vec<u8>,
value: Vec<u8>,
) -> Result<Arc<Hash>, IrohError> {
let hash = self.inner.set_bytes(author_id.0, key, value).await?;
Ok(Arc::new(Hash(hash)))
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn set_hash(
&self,
author_id: Arc<AuthorId>,
key: Vec<u8>,
hash: Arc<Hash>,
size: u64,
) -> Result<(), IrohError> {
self.inner.set_hash(author_id.0, key, hash.0, size).await?;
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn import_file(
&self,
author: Arc<AuthorId>,
key: Vec<u8>,
path: String,
in_place: bool,
cb: Option<Arc<dyn DocImportFileCallback>>,
) -> Result<(), IrohError> {
let mut stream = self
.inner
.import_file(author.0, Bytes::from(key), PathBuf::from(path), in_place)
.await?;
while let Some(progress) = stream.next().await {
let progress = progress?;
if let Some(ref cb) = cb {
cb.progress(Arc::new(progress.into())).await?;
}
}
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn export_file(
&self,
entry: Arc<Entry>,
path: String,
cb: Option<Arc<dyn DocExportFileCallback>>,
) -> Result<(), IrohError> {
let mut stream = self
.inner
.export_file(
entry.0.clone(),
std::path::PathBuf::from(path),
iroh_blobs::store::ExportMode::Copy,
)
.await?;
while let Some(progress) = stream.next().await {
let progress = progress?;
if let Some(ref cb) = cb {
cb.progress(Arc::new(progress.into())).await?;
}
}
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn delete(
&self,
author_id: Arc<AuthorId>,
prefix: Vec<u8>,
) -> Result<u64, IrohError> {
let num_del = self.inner.del(author_id.0, prefix).await?;
u64::try_from(num_del).map_err(|e| anyhow::Error::from(e).into())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn get_exact(
&self,
author: Arc<AuthorId>,
key: Vec<u8>,
include_empty: bool,
) -> Result<Option<Arc<Entry>>, IrohError> {
self.inner
.get_exact(author.0, key, include_empty)
.await
.map(|e| e.map(|e| Arc::new(e.into())))
.map_err(IrohError::from)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn get_many(&self, query: Arc<Query>) -> Result<Vec<Arc<Entry>>, IrohError> {
let entries = self
.inner
.get_many(query.0.clone())
.await?
.map_ok(|e| Arc::new(Entry(e)))
.try_collect::<Vec<_>>()
.await?;
Ok(entries)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn get_one(&self, query: Arc<Query>) -> Result<Option<Arc<Entry>>, IrohError> {
let res = self
.inner
.get_one((*query).clone().0)
.await
.map(|e| e.map(|e| Arc::new(e.into())))?;
Ok(res)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn share(
&self,
mode: ShareMode,
addr_options: AddrInfoOptions,
) -> Result<Arc<DocTicket>, IrohError> {
let res = self
.inner
.share(mode.into(), addr_options.into())
.await
.map(|ticket| Arc::new(ticket.into()))?;
Ok(res)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn start_sync(&self, peers: Vec<Arc<NodeAddr>>) -> Result<(), IrohError> {
self.inner
.start_sync(
peers
.into_iter()
.map(|p| (*p).clone().try_into())
.collect::<Result<Vec<_>, IrohError>>()?,
)
.await?;
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn leave(&self) -> Result<(), IrohError> {
self.inner.leave().await?;
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn subscribe(&self, cb: Arc<dyn SubscribeCallback>) -> Result<(), IrohError> {
let client = self.inner.clone();
tokio::task::spawn(async move {
let mut sub = client.subscribe().await.unwrap();
while let Some(event) = sub.next().await {
match event {
Ok(event) => {
if let Err(err) = cb.event(Arc::new(event.into())).await {
warn!("cb error: {:?}", err);
}
}
Err(err) => {
warn!("rpc error: {:?}", err);
}
}
}
});
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn status(&self) -> Result<OpenState, IrohError> {
let res = self.inner.status().await.map(|o| o.into())?;
Ok(res)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn set_download_policy(&self, policy: Arc<DownloadPolicy>) -> Result<(), IrohError> {
self.inner
.set_download_policy((*policy).clone().into())
.await?;
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn get_download_policy(&self) -> Result<Arc<DownloadPolicy>, IrohError> {
let res = self
.inner
.get_download_policy()
.await
.map(|policy| Arc::new(policy.into()))?;
Ok(res)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn get_sync_peers(&self) -> Result<Option<Vec<Vec<u8>>>, IrohError> {
let list = self.inner.get_sync_peers().await?;
let list = list.map(|l| l.into_iter().map(|p| p.to_vec()).collect());
Ok(list)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
pub enum DownloadPolicy {
NothingExcept(Vec<Arc<FilterKind>>),
EverythingExcept(Vec<Arc<FilterKind>>),
}
#[uniffi::export]
impl DownloadPolicy {
#[uniffi::constructor]
pub fn everything() -> Self {
DownloadPolicy::EverythingExcept(Vec::default())
}
#[uniffi::constructor]
pub fn nothing() -> Self {
DownloadPolicy::NothingExcept(Vec::default())
}
#[uniffi::constructor]
pub fn nothing_except(filters: Vec<Arc<FilterKind>>) -> Self {
DownloadPolicy::NothingExcept(filters)
}
#[uniffi::constructor]
pub fn everything_except(filters: Vec<Arc<FilterKind>>) -> Self {
DownloadPolicy::EverythingExcept(filters)
}
}
impl From<iroh_docs::store::DownloadPolicy> for DownloadPolicy {
fn from(value: iroh_docs::store::DownloadPolicy) -> Self {
match value {
iroh_docs::store::DownloadPolicy::NothingExcept(filters) => {
DownloadPolicy::NothingExcept(
filters.into_iter().map(|f| Arc::new(f.into())).collect(),
)
}
iroh_docs::store::DownloadPolicy::EverythingExcept(filters) => {
DownloadPolicy::EverythingExcept(
filters.into_iter().map(|f| Arc::new(f.into())).collect(),
)
}
}
}
}
impl From<DownloadPolicy> for iroh_docs::store::DownloadPolicy {
fn from(value: DownloadPolicy) -> Self {
match value {
DownloadPolicy::NothingExcept(filters) => {
iroh_docs::store::DownloadPolicy::NothingExcept(
filters.into_iter().map(|f| f.0.clone()).collect(),
)
}
DownloadPolicy::EverythingExcept(filters) => {
iroh_docs::store::DownloadPolicy::EverythingExcept(
filters.into_iter().map(|f| f.0.clone()).collect(),
)
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
pub struct FilterKind(pub(crate) iroh_docs::store::FilterKind);
#[uniffi::export]
impl FilterKind {
pub fn matches(&self, key: Vec<u8>) -> bool {
self.0.matches(key)
}
#[uniffi::constructor]
pub fn prefix(prefix: Vec<u8>) -> FilterKind {
FilterKind(iroh_docs::store::FilterKind::Prefix(Bytes::from(prefix)))
}
#[uniffi::constructor]
pub fn exact(key: Vec<u8>) -> FilterKind {
FilterKind(iroh_docs::store::FilterKind::Exact(Bytes::from(key)))
}
}
impl From<iroh_docs::store::FilterKind> for FilterKind {
fn from(value: iroh_docs::store::FilterKind) -> Self {
FilterKind(value)
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, uniffi::Record)]
pub struct OpenState {
pub sync: bool,
pub subscribers: u64,
pub handles: u64,
}
impl From<iroh_docs::actor::OpenState> for OpenState {
fn from(value: iroh_docs::actor::OpenState) -> Self {
OpenState {
sync: value.sync,
subscribers: value.subscribers as u64,
handles: value.handles as u64,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, uniffi::Object)]
pub struct NodeAddr {
node_id: Arc<PublicKey>,
relay_url: Option<String>,
addresses: Vec<String>,
}
#[uniffi::export]
impl NodeAddr {
#[uniffi::constructor]
pub fn new(node_id: &PublicKey, derp_url: Option<String>, addresses: Vec<String>) -> Self {
Self {
node_id: Arc::new(node_id.clone()),
relay_url: derp_url,
addresses,
}
}
pub fn direct_addresses(&self) -> Vec<String> {
self.addresses.clone()
}
pub fn relay_url(&self) -> Option<String> {
self.relay_url.clone()
}
pub fn equal(&self, other: &NodeAddr) -> bool {
self == other
}
}
impl TryFrom<NodeAddr> for iroh::NodeAddr {
type Error = IrohError;
fn try_from(value: NodeAddr) -> Result<Self, Self::Error> {
let mut node_addr = iroh::NodeAddr::new((&*value.node_id).into());
let addresses = value
.direct_addresses()
.into_iter()
.map(|addr| {
std::net::SocketAddr::from_str(&addr).map_err(|e| anyhow::Error::from(e).into())
})
.collect::<Result<Vec<_>, IrohError>>()?;
if let Some(derp_url) = value.relay_url() {
let url = url::Url::parse(&derp_url).map_err(anyhow::Error::from)?;
node_addr = node_addr.with_relay_url(url.into());
}
node_addr = node_addr.with_direct_addresses(addresses);
Ok(node_addr)
}
}
impl From<iroh::NodeAddr> for NodeAddr {
fn from(value: iroh::NodeAddr) -> Self {
NodeAddr {
node_id: Arc::new(value.node_id.into()),
relay_url: value.relay_url.map(|url| url.to_string()),
addresses: value
.direct_addresses
.into_iter()
.map(|d| d.to_string())
.collect(),
}
}
}
#[derive(Debug, uniffi::Enum)]
pub enum ShareMode {
Read,
Write,
}
impl From<ShareMode> for iroh_docs::rpc::client::docs::ShareMode {
fn from(mode: ShareMode) -> Self {
match mode {
ShareMode::Read => iroh_docs::rpc::client::docs::ShareMode::Read,
ShareMode::Write => iroh_docs::rpc::client::docs::ShareMode::Write,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, uniffi::Object)]
pub struct Entry(pub(crate) iroh_docs::rpc::client::docs::Entry);
impl From<iroh_docs::rpc::client::docs::Entry> for Entry {
fn from(e: iroh_docs::rpc::client::docs::Entry) -> Self {
Entry(e)
}
}
#[uniffi::export]
impl Entry {
#[uniffi::method]
pub fn author(&self) -> Arc<AuthorId> {
Arc::new(AuthorId(self.0.id().author()))
}
#[uniffi::method]
pub fn content_hash(&self) -> Arc<Hash> {
Arc::new(Hash(self.0.content_hash()))
}
#[uniffi::method]
pub fn content_len(&self) -> u64 {
self.0.content_len()
}
#[uniffi::method]
pub fn key(&self) -> Vec<u8> {
self.0.id().key().to_vec()
}
#[uniffi::method]
pub fn namespace(&self) -> String {
self.0.id().namespace().to_string()
}
#[uniffi::method]
pub fn timestamp(&self) -> u64 {
self.0.timestamp()
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, uniffi::Enum)]
pub enum SortBy {
KeyAuthor,
#[default]
AuthorKey,
}
impl From<iroh_docs::store::SortBy> for SortBy {
fn from(value: iroh_docs::store::SortBy) -> Self {
match value {
iroh_docs::store::SortBy::AuthorKey => SortBy::AuthorKey,
iroh_docs::store::SortBy::KeyAuthor => SortBy::KeyAuthor,
}
}
}
impl From<SortBy> for iroh_docs::store::SortBy {
fn from(value: SortBy) -> Self {
match value {
SortBy::AuthorKey => iroh_docs::store::SortBy::AuthorKey,
SortBy::KeyAuthor => iroh_docs::store::SortBy::KeyAuthor,
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, uniffi::Enum)]
pub enum SortDirection {
#[default]
Asc,
Desc,
}
impl From<iroh_docs::store::SortDirection> for SortDirection {
fn from(value: iroh_docs::store::SortDirection) -> Self {
match value {
iroh_docs::store::SortDirection::Asc => SortDirection::Asc,
iroh_docs::store::SortDirection::Desc => SortDirection::Desc,
}
}
}
impl From<SortDirection> for iroh_docs::store::SortDirection {
fn from(value: SortDirection) -> Self {
match value {
SortDirection::Asc => iroh_docs::store::SortDirection::Asc,
SortDirection::Desc => iroh_docs::store::SortDirection::Desc,
}
}
}
#[derive(Clone, Debug, uniffi::Object)]
pub struct Query(pub(crate) iroh_docs::store::Query);
#[derive(Clone, Debug, Default, uniffi::Record)]
pub struct QueryOptions {
pub sort_by: SortBy,
pub direction: SortDirection,
pub offset: u64,
pub limit: u64,
}
#[uniffi::export]
impl Query {
#[uniffi::constructor]
pub fn all(opts: Option<QueryOptions>) -> Self {
let mut builder = iroh_docs::store::Query::all();
if let Some(opts) = opts {
if opts.offset != 0 {
builder = builder.offset(opts.offset);
}
if opts.limit != 0 {
builder = builder.limit(opts.limit);
}
builder = builder.sort_by(opts.sort_by.into(), opts.direction.into());
}
Query(builder.build())
}
#[uniffi::constructor]
pub fn single_latest_per_key(opts: Option<QueryOptions>) -> Self {
let mut builder = iroh_docs::store::Query::single_latest_per_key();
if let Some(opts) = opts {
if opts.offset != 0 {
builder = builder.offset(opts.offset);
}
if opts.limit != 0 {
builder = builder.limit(opts.limit);
}
builder = builder.sort_direction(opts.direction.into());
}
Query(builder.build())
}
#[uniffi::constructor]
pub fn single_latest_per_key_exact(key: Vec<u8>) -> Self {
let builder = iroh_docs::store::Query::single_latest_per_key()
.key_exact(key)
.build();
Query(builder)
}
#[uniffi::constructor]
pub fn single_latest_per_key_prefix(prefix: Vec<u8>, opts: Option<QueryOptions>) -> Self {
let mut builder = iroh_docs::store::Query::single_latest_per_key().key_prefix(prefix);
if let Some(opts) = opts {
if opts.offset != 0 {
builder = builder.offset(opts.offset);
}
if opts.limit != 0 {
builder = builder.limit(opts.limit);
}
}
Query(builder.build())
}
#[uniffi::constructor]
pub fn author(author: &AuthorId, opts: Option<QueryOptions>) -> Self {
let mut builder = iroh_docs::store::Query::author(author.0);
if let Some(opts) = opts {
if opts.offset != 0 {
builder = builder.offset(opts.offset);
}
if opts.limit != 0 {
builder = builder.limit(opts.limit);
}
builder = builder.sort_by(opts.sort_by.into(), opts.direction.into());
}
Query(builder.build())
}
#[uniffi::constructor]
pub fn key_exact(key: Vec<u8>, opts: Option<QueryOptions>) -> Self {
let mut builder = iroh_docs::store::Query::key_exact(key);
if let Some(opts) = opts {
if opts.offset != 0 {
builder = builder.offset(opts.offset);
}
if opts.limit != 0 {
builder = builder.limit(opts.limit);
}
builder = builder.sort_by(opts.sort_by.into(), opts.direction.into());
}
Query(builder.build())
}
#[uniffi::constructor]
pub fn author_key_exact(author: &AuthorId, key: Vec<u8>) -> Self {
let builder = iroh_docs::store::Query::author(author.0).key_exact(key);
Query(builder.build())
}
#[uniffi::constructor]
pub fn key_prefix(prefix: Vec<u8>, opts: Option<QueryOptions>) -> Self {
let mut builder = iroh_docs::store::Query::key_prefix(prefix);
if let Some(opts) = opts {
if opts.offset != 0 {
builder = builder.offset(opts.offset);
}
if opts.limit != 0 {
builder = builder.limit(opts.limit);
}
builder = builder.sort_by(opts.sort_by.into(), opts.direction.into());
}
Query(builder.build())
}
#[uniffi::constructor]
pub fn author_key_prefix(
author: &AuthorId,
prefix: Vec<u8>,
opts: Option<QueryOptions>,
) -> Self {
let mut builder = iroh_docs::store::Query::author(author.0).key_prefix(prefix);
if let Some(opts) = opts {
if opts.offset != 0 {
builder = builder.offset(opts.offset);
}
if opts.limit != 0 {
builder = builder.limit(opts.limit);
}
builder = builder.sort_by(opts.sort_by.into(), opts.direction.into());
}
Query(builder.build())
}
pub fn limit(&self) -> Option<u64> {
self.0.limit()
}
pub fn offset(&self) -> u64 {
self.0.offset()
}
}
#[uniffi::export(with_foreign)]
#[async_trait::async_trait]
pub trait SubscribeCallback: Send + Sync + 'static {
async fn event(&self, event: Arc<LiveEvent>) -> Result<(), CallbackError>;
}
#[derive(Debug, Serialize, Deserialize, uniffi::Object)]
#[allow(clippy::large_enum_variant)]
pub enum LiveEvent {
InsertLocal {
entry: Entry,
},
InsertRemote {
from: PublicKey,
entry: Entry,
content_status: ContentStatus,
},
ContentReady {
hash: Hash,
},
NeighborUp(PublicKey),
NeighborDown(PublicKey),
SyncFinished(SyncEvent),
PendingContentReady,
}
#[derive(Debug, uniffi::Enum)]
pub enum LiveEventType {
InsertLocal,
InsertRemote,
ContentReady,
NeighborUp,
NeighborDown,
SyncFinished,
PendingContentReady,
}
#[uniffi::export]
impl LiveEvent {
pub fn r#type(&self) -> LiveEventType {
match self {
Self::InsertLocal { .. } => LiveEventType::InsertLocal,
Self::InsertRemote { .. } => LiveEventType::InsertRemote,
Self::ContentReady { .. } => LiveEventType::ContentReady,
Self::NeighborUp(_) => LiveEventType::NeighborUp,
Self::NeighborDown(_) => LiveEventType::NeighborDown,
Self::SyncFinished(_) => LiveEventType::SyncFinished,
Self::PendingContentReady => LiveEventType::PendingContentReady,
}
}
pub fn as_insert_local(&self) -> Arc<Entry> {
if let Self::InsertLocal { entry } = self {
Arc::new(entry.clone())
} else {
panic!("not an insert local event");
}
}
pub fn as_insert_remote(&self) -> InsertRemoteEvent {
if let Self::InsertRemote {
from,
entry,
content_status,
} = self
{
InsertRemoteEvent {
from: Arc::new(from.clone()),
entry: Arc::new(entry.clone()),
content_status: content_status.clone(),
}
} else {
panic!("not an insert remote event");
}
}
pub fn as_content_ready(&self) -> Arc<Hash> {
if let Self::ContentReady { hash } = self {
Arc::new(hash.clone())
} else {
panic!("not an content ready event");
}
}
pub fn as_neighbor_up(&self) -> Arc<PublicKey> {
if let Self::NeighborUp(key) = self {
Arc::new(key.clone())
} else {
panic!("not an neighbor up event");
}
}
pub fn as_neighbor_down(&self) -> Arc<PublicKey> {
if let Self::NeighborDown(key) = self {
Arc::new(key.clone())
} else {
panic!("not an neighbor down event");
}
}
pub fn as_sync_finished(&self) -> SyncEvent {
if let Self::SyncFinished(event) = self {
event.clone()
} else {
panic!("not an sync event event");
}
}
}
impl From<iroh_docs::rpc::client::docs::LiveEvent> for LiveEvent {
fn from(value: iroh_docs::rpc::client::docs::LiveEvent) -> Self {
match value {
iroh_docs::rpc::client::docs::LiveEvent::InsertLocal { entry } => {
LiveEvent::InsertLocal {
entry: entry.into(),
}
}
iroh_docs::rpc::client::docs::LiveEvent::InsertRemote {
from,
entry,
content_status,
} => LiveEvent::InsertRemote {
from: from.into(),
entry: entry.into(),
content_status: content_status.into(),
},
iroh_docs::rpc::client::docs::LiveEvent::ContentReady { hash } => {
LiveEvent::ContentReady { hash: hash.into() }
}
iroh_docs::rpc::client::docs::LiveEvent::NeighborUp(key) => {
LiveEvent::NeighborUp(key.into())
}
iroh_docs::rpc::client::docs::LiveEvent::NeighborDown(key) => {
LiveEvent::NeighborDown(key.into())
}
iroh_docs::rpc::client::docs::LiveEvent::SyncFinished(e) => {
LiveEvent::SyncFinished(e.into())
}
iroh_docs::rpc::client::docs::LiveEvent::PendingContentReady => {
LiveEvent::PendingContentReady
}
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, uniffi::Record)]
pub struct SyncEvent {
pub peer: Arc<PublicKey>,
pub origin: Origin,
pub finished: SystemTime,
pub started: SystemTime,
pub result: Option<String>,
}
impl From<iroh_docs::rpc::client::docs::SyncEvent> for SyncEvent {
fn from(value: iroh_docs::rpc::client::docs::SyncEvent) -> Self {
SyncEvent {
peer: Arc::new(value.peer.into()),
origin: value.origin.into(),
finished: value.finished,
started: value.started,
result: match value.result {
Ok(_) => None,
Err(err) => Some(err),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy, uniffi::Enum)]
pub enum SyncReason {
DirectJoin,
NewNeighbor,
SyncReport,
Resync,
}
impl From<iroh_docs::rpc::client::docs::SyncReason> for SyncReason {
fn from(value: iroh_docs::rpc::client::docs::SyncReason) -> Self {
match value {
iroh_docs::rpc::client::docs::SyncReason::DirectJoin => Self::DirectJoin,
iroh_docs::rpc::client::docs::SyncReason::NewNeighbor => Self::NewNeighbor,
iroh_docs::rpc::client::docs::SyncReason::SyncReport => Self::SyncReport,
iroh_docs::rpc::client::docs::SyncReason::Resync => Self::Resync,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, uniffi::Enum)]
pub enum Origin {
Connect { reason: SyncReason },
Accept,
}
impl From<iroh_docs::rpc::client::docs::Origin> for Origin {
fn from(value: iroh_docs::rpc::client::docs::Origin) -> Self {
match value {
iroh_docs::rpc::client::docs::Origin::Connect(reason) => Self::Connect {
reason: reason.into(),
},
iroh_docs::rpc::client::docs::Origin::Accept => Self::Accept,
}
}
}
#[derive(Debug, Serialize, Deserialize, uniffi::Record)]
pub struct InsertRemoteEvent {
pub from: Arc<PublicKey>,
pub entry: Arc<Entry>,
pub content_status: ContentStatus,
}
#[derive(Debug, Clone, Serialize, Deserialize, uniffi::Enum)]
pub enum ContentStatus {
Complete,
Incomplete,
Missing,
}
impl From<iroh_docs::ContentStatus> for ContentStatus {
fn from(value: iroh_docs::ContentStatus) -> Self {
match value {
iroh_docs::ContentStatus::Complete => Self::Complete,
iroh_docs::ContentStatus::Incomplete => Self::Incomplete,
iroh_docs::ContentStatus::Missing => Self::Missing,
}
}
}
#[uniffi::export(with_foreign)]
#[async_trait::async_trait]
pub trait DocImportFileCallback: Send + Sync + 'static {
async fn progress(&self, progress: Arc<DocImportProgress>) -> Result<(), CallbackError>;
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Enum)]
pub enum DocImportProgressType {
Found,
Progress,
IngestDone,
AllDone,
Abort,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocImportProgressFound {
pub id: u64,
pub name: String,
pub size: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocImportProgressProgress {
pub id: u64,
pub offset: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocImportProgressIngestDone {
pub id: u64,
pub hash: Arc<Hash>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocImportProgressAllDone {
pub key: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocImportProgressAbort {
pub error: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
pub enum DocImportProgress {
Found(DocImportProgressFound),
Progress(DocImportProgressProgress),
IngestDone(DocImportProgressIngestDone),
AllDone(DocImportProgressAllDone),
Abort(DocImportProgressAbort),
}
impl From<iroh_docs::rpc::client::docs::ImportProgress> for DocImportProgress {
fn from(value: iroh_docs::rpc::client::docs::ImportProgress) -> Self {
match value {
iroh_docs::rpc::client::docs::ImportProgress::Found { id, name, size } => {
DocImportProgress::Found(DocImportProgressFound { id, name, size })
}
iroh_docs::rpc::client::docs::ImportProgress::Progress { id, offset } => {
DocImportProgress::Progress(DocImportProgressProgress { id, offset })
}
iroh_docs::rpc::client::docs::ImportProgress::IngestDone { id, hash } => {
DocImportProgress::IngestDone(DocImportProgressIngestDone {
id,
hash: Arc::new(hash.into()),
})
}
iroh_docs::rpc::client::docs::ImportProgress::AllDone { key } => {
DocImportProgress::AllDone(DocImportProgressAllDone { key: key.into() })
}
iroh_docs::rpc::client::docs::ImportProgress::Abort(err) => {
DocImportProgress::Abort(DocImportProgressAbort {
error: err.to_string(),
})
}
}
}
}
#[uniffi::export]
impl DocImportProgress {
pub fn r#type(&self) -> DocImportProgressType {
match self {
DocImportProgress::Found(_) => DocImportProgressType::Found,
DocImportProgress::Progress(_) => DocImportProgressType::Progress,
DocImportProgress::IngestDone(_) => DocImportProgressType::IngestDone,
DocImportProgress::AllDone(_) => DocImportProgressType::AllDone,
DocImportProgress::Abort(_) => DocImportProgressType::Abort,
}
}
pub fn as_found(&self) -> DocImportProgressFound {
match self {
DocImportProgress::Found(f) => f.clone(),
_ => panic!("DocImportProgress type is not 'Found'"),
}
}
pub fn as_progress(&self) -> DocImportProgressProgress {
match self {
DocImportProgress::Progress(p) => p.clone(),
_ => panic!("DocImportProgress type is not 'Progress'"),
}
}
pub fn as_ingest_done(&self) -> DocImportProgressIngestDone {
match self {
DocImportProgress::IngestDone(d) => d.clone(),
_ => panic!("DocImportProgress type is not 'IngestDone'"),
}
}
pub fn as_all_done(&self) -> DocImportProgressAllDone {
match self {
DocImportProgress::AllDone(a) => a.clone(),
_ => panic!("DocImportProgress type is not 'AllDone'"),
}
}
pub fn as_abort(&self) -> DocImportProgressAbort {
match self {
DocImportProgress::Abort(a) => a.clone(),
_ => panic!("DocImportProgress type is not 'Abort'"),
}
}
}
#[uniffi::export(with_foreign)]
#[async_trait::async_trait]
pub trait DocExportFileCallback: Send + Sync + 'static {
async fn progress(&self, progress: Arc<DocExportProgress>) -> Result<(), CallbackError>;
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Enum)]
pub enum DocExportProgressType {
Found,
Progress,
Done,
AllDone,
Abort,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocExportProgressFound {
pub id: u64,
pub hash: Arc<Hash>,
pub size: u64,
pub outpath: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocExportProgressProgress {
pub id: u64,
pub offset: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocExportProgressDone {
pub id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DocExportProgressAbort {
pub error: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
pub enum DocExportProgress {
Found(DocExportProgressFound),
Progress(DocExportProgressProgress),
Done(DocExportProgressDone),
AllDone,
Abort(DocExportProgressAbort),
}
impl From<iroh_blobs::export::ExportProgress> for DocExportProgress {
fn from(value: iroh_blobs::export::ExportProgress) -> Self {
match value {
iroh_blobs::export::ExportProgress::Found {
id,
hash,
size,
outpath,
..
} => DocExportProgress::Found(DocExportProgressFound {
id,
hash: Arc::new(hash.into()),
size: size.value(),
outpath: outpath.to_string_lossy().to_string(),
}),
iroh_blobs::export::ExportProgress::Progress { id, offset } => {
DocExportProgress::Progress(DocExportProgressProgress { id, offset })
}
iroh_blobs::export::ExportProgress::Done { id } => {
DocExportProgress::Done(DocExportProgressDone { id })
}
iroh_blobs::export::ExportProgress::AllDone => DocExportProgress::AllDone,
iroh_blobs::export::ExportProgress::Abort(err) => {
DocExportProgress::Abort(DocExportProgressAbort {
error: err.to_string(),
})
}
}
}
}
#[uniffi::export]
impl DocExportProgress {
pub fn r#type(&self) -> DocExportProgressType {
match self {
DocExportProgress::Found(_) => DocExportProgressType::Found,
DocExportProgress::Progress(_) => DocExportProgressType::Progress,
DocExportProgress::Done(_) => DocExportProgressType::Done,
DocExportProgress::AllDone => DocExportProgressType::AllDone,
DocExportProgress::Abort(_) => DocExportProgressType::Abort,
}
}
pub fn as_found(&self) -> DocExportProgressFound {
match self {
DocExportProgress::Found(f) => f.clone(),
_ => panic!("DocExportProgress type is not 'Found'"),
}
}
pub fn as_progress(&self) -> DocExportProgressProgress {
match self {
DocExportProgress::Progress(p) => p.clone(),
_ => panic!("DocExportProgress type is not 'Progress'"),
}
}
pub fn as_abort(&self) -> DocExportProgressAbort {
match self {
DocExportProgress::Abort(a) => a.clone(),
_ => panic!("DocExportProgress type is not 'Abort'"),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{setup_logging, PublicKey};
use rand::RngCore;
use tokio::{io::AsyncWriteExt, sync::mpsc};
#[tokio::test]
async fn test_doc_create() {
let path = tempfile::tempdir().unwrap();
let options = crate::NodeOptions {
enable_docs: true,
..Default::default()
};
let node = Iroh::persistent_with_options(
path.path()
.join("doc-create")
.to_string_lossy()
.into_owned(),
options,
)
.await
.unwrap();
let node_id = node.net().node_id().await.unwrap();
println!("id: {}", node_id);
let doc = node.docs().create().await.unwrap();
let doc_id = doc.id();
println!("doc_id: {}", doc_id);
let doc_ticket = doc
.share(crate::doc::ShareMode::Write, AddrInfoOptions::Id)
.await
.unwrap();
println!("doc_ticket: {}", doc_ticket);
node.docs().join(&doc_ticket).await.unwrap();
}
#[tokio::test]
async fn test_basic_sync() {
setup_logging();
let iroh_dir = tempfile::tempdir().unwrap();
let options = crate::NodeOptions {
enable_docs: true,
..Default::default()
};
let node_0 = Iroh::persistent_with_options(
iroh_dir
.path()
.join("basic-sync-0")
.to_string_lossy()
.into_owned(),
options,
)
.await
.unwrap();
tracing::warn!("first node started");
let options = crate::NodeOptions {
enable_docs: true,
..Default::default()
};
let node_1 = Iroh::persistent_with_options(
iroh_dir
.path()
.join("basic-sync-1")
.to_string_lossy()
.into_owned(),
options,
)
.await
.unwrap();
tracing::warn!("second ndoe started");
let doc_0 = node_0.docs().create().await.unwrap();
tracing::warn!("doc created");
let ticket = doc_0
.share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses)
.await
.unwrap();
tracing::warn!("ticket created");
let (found_s, mut found_r) = mpsc::channel(8);
struct Callback {
found_s: mpsc::Sender<Arc<LiveEvent>>,
}
#[async_trait::async_trait]
impl SubscribeCallback for Callback {
async fn event(&self, event: Arc<LiveEvent>) -> Result<(), CallbackError> {
println!("event {:?}", event);
self.found_s.send(event).await.unwrap();
Ok(())
}
}
let cb_0 = Callback { found_s };
doc_0.subscribe(Arc::new(cb_0)).await.unwrap();
let (found_s_1, mut found_r_1) = mpsc::channel(8);
let cb_1 = Callback { found_s: found_s_1 };
let doc_1 = node_1
.docs()
.join_and_subscribe(&ticket, Arc::new(cb_1))
.await
.unwrap();
tracing::warn!("joined");
while let Some(event) = found_r_1.recv().await {
if let LiveEvent::SyncFinished(_) = *event {
break;
}
}
let author = node_1.authors().create().await.unwrap();
doc_1
.set_bytes(&author, b"hello".to_vec(), b"world".to_vec())
.await
.unwrap();
while let Some(event) = found_r.recv().await {
if let LiveEvent::ContentReady { ref hash } = *event {
let val = node_1
.blobs()
.read_to_bytes(hash.clone().into())
.await
.unwrap();
assert_eq!(b"world".to_vec(), val);
break;
}
}
}
#[test]
fn test_node_addr() {
let key_str = "7db06b57aac9b3640961d281239c8f23487ac7f7265da21607c5612d3527a254";
let node_id = PublicKey::from_string(key_str.into()).unwrap();
let port = 3000;
let ipv4 = format!("127.0.0.1:{port}");
let ipv6 = format!("::1:{port}");
let derp_url = String::from("https://derp.url");
let addrs = vec![ipv4, ipv6];
let expect_addrs = addrs.clone();
let node_addr = NodeAddr::new(&node_id, Some(derp_url.clone()), addrs);
let got_addrs = node_addr.direct_addresses();
let addrs = expect_addrs.iter().zip(got_addrs.iter());
for (expect, got) in addrs {
assert_eq!(got, expect);
}
let got_derp_url = node_addr.relay_url().unwrap();
assert_eq!(derp_url, got_derp_url);
}
#[test]
fn test_author_id() {
let author_str = "7db06b57aac9b3640961d281239c8f23487ac7f7265da21607c5612d3527a254";
let author = AuthorId::from_string(author_str.into()).unwrap();
assert_eq!(author_str, author.to_string());
let author_0 = AuthorId::from_string(author_str.into()).unwrap();
assert!(author.equal(&author_0));
assert!(author_0.equal(&author));
}
#[test]
fn test_query() {
let opts = QueryOptions {
offset: 10,
limit: 10,
..QueryOptions::default()
};
let all = Query::all(Some(opts));
assert_eq!(10, all.offset());
assert_eq!(Some(10), all.limit());
let opts = QueryOptions {
direction: SortDirection::Desc,
..QueryOptions::default()
};
let single_latest_per_key = Query::single_latest_per_key(Some(opts));
assert_eq!(0, single_latest_per_key.offset());
assert_eq!(None, single_latest_per_key.limit());
let opts = QueryOptions {
offset: 100,
..QueryOptions::default()
};
let author = Query::author(
&AuthorId::from_string(
"7db06b57aac9b3640961d281239c8f23487ac7f7265da21607c5612d3527a254".to_string(),
)
.unwrap(),
Some(opts),
);
assert_eq!(100, author.offset());
assert_eq!(None, author.limit());
let opts = QueryOptions {
limit: 100,
..QueryOptions::default()
};
let key_exact = Query::key_exact(b"key".to_vec(), Some(opts));
assert_eq!(0, key_exact.offset());
assert_eq!(Some(100), key_exact.limit());
let opts = QueryOptions {
sort_by: SortBy::KeyAuthor,
direction: SortDirection::Desc,
offset: 0,
limit: 100,
};
let key_prefix = Query::key_prefix(b"prefix".to_vec(), Some(opts));
assert_eq!(0, key_prefix.offset());
assert_eq!(Some(100), key_prefix.limit());
}
#[tokio::test]
async fn test_doc_entry_basics() {
let path = tempfile::tempdir().unwrap();
let options = crate::NodeOptions {
enable_docs: true,
..Default::default()
};
let node = crate::Iroh::persistent_with_options(
path.path()
.join("doc-entry-basics")
.to_string_lossy()
.into_owned(),
options,
)
.await
.unwrap();
let doc = node.docs().create().await.unwrap();
let author = node.authors().create().await.unwrap();
let val = b"hello world!".to_vec();
let key = b"foo".to_vec();
let hash = doc
.set_bytes(&author, key.clone(), val.clone())
.await
.unwrap();
let query = Query::author_key_exact(&author, key.clone());
let entry = doc.get_one(query.into()).await.unwrap().unwrap();
assert!(hash.equal(&entry.content_hash()));
let got_val = node
.blobs()
.read_to_bytes(entry.content_hash())
.await
.unwrap();
assert_eq!(val, got_val);
assert_eq!(val.len() as u64, entry.content_len());
}
#[tokio::test]
async fn test_doc_import_export() {
let temp_dir = tempfile::tempdir().unwrap();
let in_root = temp_dir.path().join("import-export-in");
tokio::fs::create_dir_all(in_root.clone()).await.unwrap();
let out_root = temp_dir.path().join("import-export-out");
let path = in_root.join("test");
let size = 100;
let mut buf = vec![0u8; size];
rand::thread_rng().fill_bytes(&mut buf);
let mut file = tokio::fs::File::create(path.clone()).await.unwrap();
file.write_all(&buf.clone()).await.unwrap();
file.flush().await.unwrap();
let iroh_dir = tempfile::tempdir().unwrap();
let options = crate::NodeOptions {
enable_docs: true,
..Default::default()
};
let node = crate::Iroh::persistent_with_options(
iroh_dir
.path()
.join("import-export-node")
.to_string_lossy()
.into_owned(),
options,
)
.await
.unwrap();
let doc = node.docs().create().await.unwrap();
let author = node.authors().create().await.unwrap();
let path_str = path.to_string_lossy().into_owned();
let in_root_str = in_root.to_string_lossy().into_owned();
let key = crate::path_to_key(path_str.clone(), None, Some(in_root_str)).unwrap();
doc.import_file(author.clone(), key.clone(), path_str, true, None)
.await
.unwrap();
let entry = doc
.get_one(Query::author_key_exact(&author, key).into())
.await
.unwrap()
.unwrap();
let key = entry.key().to_vec();
let out_root_str = out_root.to_string_lossy().into_owned();
let path = crate::key_to_path(key, None, Some(out_root_str)).unwrap();
doc.export_file(entry, path.clone(), None).await.unwrap();
let got_bytes = tokio::fs::read(path).await.unwrap();
assert_eq!(buf, got_bytes);
}
}