use std::{
path::PathBuf,
str::FromStr,
sync::{Arc, RwLock},
time::Duration,
};
use futures::{StreamExt, TryStreamExt};
use iroh_blobs::store::BaoBlobSize;
use serde::{Deserialize, Serialize};
use crate::{node::Iroh, BlobsClient, CallbackError, NetClient};
use crate::{ticket::AddrInfoOptions, BlobTicket};
use crate::{IrohError, NodeAddr};
#[derive(uniffi::Object)]
pub struct Blobs {
client: BlobsClient,
net_client: NetClient,
}
#[uniffi::export]
impl Iroh {
pub fn blobs(&self) -> Blobs {
Blobs {
client: self.blobs_client.clone(),
net_client: self.net_client.clone(),
}
}
}
#[uniffi::export]
impl Blobs {
#[uniffi::method(async_runtime = "tokio")]
pub async fn list(&self) -> Result<Vec<Arc<Hash>>, IrohError> {
let response = self.client.list().await?;
let hashes: Vec<Arc<Hash>> = response
.map_ok(|i| Arc::new(Hash(i.hash)))
.try_collect()
.await?;
Ok(hashes)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn size(&self, hash: &Hash) -> Result<u64, IrohError> {
let r = self.client.read(hash.0).await?;
Ok(r.size())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn has(&self, hash: &Hash) -> Result<bool, IrohError> {
let has_blob = self.client.has(hash.0).await?;
Ok(has_blob)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn status(&self, hash: &Hash) -> Result<BlobStatus, IrohError> {
let status = self.client.status(hash.0).await?;
Ok(status.into())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn read_to_bytes(&self, hash: Arc<Hash>) -> Result<Vec<u8>, IrohError> {
let res = self
.client
.read_to_bytes(hash.0)
.await
.map(|b| b.to_vec())?;
Ok(res)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn read_at_to_bytes(
&self,
hash: Arc<Hash>,
offset: u64,
len: &ReadAtLen,
) -> Result<Vec<u8>, IrohError> {
let res = self
.client
.read_at_to_bytes(hash.0, offset, (*len).into())
.await
.map(|b| b.to_vec())?;
Ok(res)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn add_from_path(
&self,
path: String,
in_place: bool,
tag: Arc<SetTagOption>,
wrap: Arc<WrapOption>,
cb: Arc<dyn AddCallback>,
) -> Result<(), IrohError> {
let mut stream = self
.client
.add_from_path(
path.into(),
in_place,
(*tag).clone().into(),
(*wrap).clone().into(),
)
.await?;
while let Some(progress) = stream.next().await {
let progress = progress?;
cb.progress(Arc::new(progress.into())).await?;
}
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn write_to_path(&self, hash: Arc<Hash>, path: String) -> Result<(), IrohError> {
let mut reader = self.client.read(hash.0).await?;
let path: PathBuf = path.into();
if let Some(dir) = path.parent() {
tokio::fs::create_dir_all(dir)
.await
.map_err(anyhow::Error::from)?;
}
let mut file = tokio::fs::File::create(path)
.await
.map_err(anyhow::Error::from)?;
tokio::io::copy(&mut reader, &mut file)
.await
.map_err(anyhow::Error::from)?;
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn add_bytes(&self, bytes: Vec<u8>) -> Result<BlobAddOutcome, IrohError> {
let res = self.client.add_bytes(bytes).await?;
Ok(res.into())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn add_bytes_named(
&self,
bytes: Vec<u8>,
name: String,
) -> Result<BlobAddOutcome, IrohError> {
let res = self
.client
.add_bytes_named(bytes, iroh_blobs::Tag(name.into()))
.await?;
Ok(res.into())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn download(
&self,
hash: Arc<Hash>,
opts: Arc<BlobDownloadOptions>,
cb: Arc<dyn DownloadCallback>,
) -> Result<(), IrohError> {
let mut stream = self
.client
.download_with_opts(hash.0, opts.0.clone())
.await?;
while let Some(progress) = stream.next().await {
let progress = progress?;
cb.progress(Arc::new(progress.into())).await?;
}
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn export(
&self,
hash: Arc<Hash>,
destination: String,
format: BlobExportFormat,
mode: BlobExportMode,
) -> Result<(), IrohError> {
let destination: PathBuf = destination.into();
if let Some(dir) = destination.parent() {
tokio::fs::create_dir_all(dir)
.await
.map_err(anyhow::Error::from)?;
}
let stream = self
.client
.export(hash.0, destination, format.into(), mode.into())
.await?;
stream.finish().await?;
Ok(())
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn share(
&self,
hash: Arc<Hash>,
blob_format: BlobFormat,
ticket_options: AddrInfoOptions,
) -> Result<Arc<BlobTicket>, IrohError> {
let addr = self.net_client.node_addr().await?;
let opts: iroh_docs::rpc::AddrInfoOptions = ticket_options.into();
let addr = opts.apply(&addr);
let ticket = iroh_blobs::ticket::BlobTicket::new(addr, hash.0, blob_format.into())?;
Ok(Arc::new(ticket.into()))
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn list_incomplete(&self) -> Result<Vec<IncompleteBlobInfo>, IrohError> {
let blobs = self
.client
.list_incomplete()
.await?
.map_ok(|res| res.into())
.try_collect::<Vec<_>>()
.await?;
Ok(blobs)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn list_collections(&self) -> Result<Vec<CollectionInfo>, IrohError> {
let blobs = self
.client
.list_collections()?
.map_ok(|res| res.into())
.try_collect::<Vec<_>>()
.await?;
Ok(blobs)
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn get_collection(&self, hash: Arc<Hash>) -> Result<Arc<Collection>, IrohError> {
let collection = self.client.get_collection(hash.0).await?;
Ok(Arc::new(collection.into()))
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn create_collection(
&self,
collection: Arc<Collection>,
tag: Arc<SetTagOption>,
tags_to_delete: Vec<String>,
) -> Result<HashAndTag, IrohError> {
let collection = collection.0.read().unwrap().clone();
let (hash, tag) = self
.client
.create_collection(
collection,
(*tag).clone().into(),
tags_to_delete
.into_iter()
.map(iroh_blobs::Tag::from)
.collect(),
)
.await?;
Ok(HashAndTag {
hash: Arc::new(hash.into()),
tag: tag.0.to_vec(),
})
}
#[uniffi::method(async_runtime = "tokio")]
pub async fn delete_blob(&self, hash: Arc<Hash>) -> Result<(), IrohError> {
let mut tags = self.client.tags().list().await?;
let mut name = None;
while let Some(tag) = tags.next().await {
let tag = tag?;
if tag.hash == hash.0 {
name = Some(tag.name);
}
}
if let Some(name) = name {
self.client.tags().delete(name).await?;
self.client.delete_blob((*hash).clone().0).await?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct HashAndTag {
pub hash: Arc<Hash>,
pub tag: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct BlobAddOutcome {
pub hash: Arc<Hash>,
pub format: BlobFormat,
pub size: u64,
pub tag: Vec<u8>,
}
impl From<iroh_blobs::rpc::client::blobs::AddOutcome> for BlobAddOutcome {
fn from(value: iroh_blobs::rpc::client::blobs::AddOutcome) -> Self {
BlobAddOutcome {
hash: Arc::new(value.hash.into()),
format: value.format.into(),
size: value.size,
tag: value.tag.0.to_vec(),
}
}
}
#[derive(Debug, uniffi::Object, Clone, Copy)]
pub enum BlobStatus {
NotFound,
Partial {
size: u64,
size_is_verified: bool,
},
Complete {
size: u64,
},
}
impl From<iroh_blobs::rpc::client::blobs::BlobStatus> for BlobStatus {
fn from(value: iroh_blobs::rpc::client::blobs::BlobStatus) -> Self {
match value {
iroh_blobs::rpc::client::blobs::BlobStatus::NotFound => Self::NotFound,
iroh_blobs::rpc::client::blobs::BlobStatus::Partial { size } => match size {
BaoBlobSize::Unverified(size) => Self::Partial {
size,
size_is_verified: false,
},
BaoBlobSize::Verified(size) => Self::Partial {
size,
size_is_verified: true,
},
},
iroh_blobs::rpc::client::blobs::BlobStatus::Complete { size } => {
Self::Complete { size }
}
}
}
}
#[derive(Debug, uniffi::Object, Default, Clone, Copy)]
pub enum ReadAtLen {
#[default]
All,
Exact(u64),
AtMost(u64),
}
#[uniffi::export]
impl ReadAtLen {
#[uniffi::constructor]
pub fn all() -> Self {
Self::All
}
#[uniffi::constructor]
pub fn exact(size: u64) -> Self {
Self::Exact(size)
}
#[uniffi::constructor]
pub fn at_most(size: u64) -> Self {
Self::AtMost(size)
}
}
impl From<ReadAtLen> for iroh_blobs::rpc::client::blobs::ReadAtLen {
fn from(value: ReadAtLen) -> Self {
match value {
ReadAtLen::All => iroh_blobs::rpc::client::blobs::ReadAtLen::All,
ReadAtLen::Exact(s) => iroh_blobs::rpc::client::blobs::ReadAtLen::Exact(s),
ReadAtLen::AtMost(s) => iroh_blobs::rpc::client::blobs::ReadAtLen::AtMost(s),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, uniffi::Object)]
pub enum SetTagOption {
Auto,
Named(Vec<u8>),
}
#[uniffi::export]
impl SetTagOption {
#[uniffi::constructor]
pub fn auto() -> Self {
SetTagOption::Auto
}
#[uniffi::constructor]
pub fn named(tag: Vec<u8>) -> Self {
SetTagOption::Named(tag)
}
}
impl From<SetTagOption> for iroh_blobs::util::SetTagOption {
fn from(value: SetTagOption) -> Self {
match value {
SetTagOption::Auto => iroh_blobs::util::SetTagOption::Auto,
SetTagOption::Named(tag) => {
iroh_blobs::util::SetTagOption::Named(iroh_blobs::Tag(bytes::Bytes::from(tag)))
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, uniffi::Object)]
pub enum WrapOption {
NoWrap,
Wrap {
name: Option<String>,
},
}
#[uniffi::export]
impl WrapOption {
#[uniffi::constructor]
pub fn no_wrap() -> Self {
WrapOption::NoWrap
}
#[uniffi::constructor]
pub fn wrap(name: Option<String>) -> Self {
WrapOption::Wrap { name }
}
}
impl From<WrapOption> for iroh_blobs::rpc::client::blobs::WrapOption {
fn from(value: WrapOption) -> Self {
match value {
WrapOption::NoWrap => iroh_blobs::rpc::client::blobs::WrapOption::NoWrap,
WrapOption::Wrap { name } => iroh_blobs::rpc::client::blobs::WrapOption::Wrap { name },
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
#[uniffi::export(Display)]
pub struct Hash(pub(crate) iroh_blobs::Hash);
impl From<iroh_blobs::Hash> for Hash {
fn from(h: iroh_blobs::Hash) -> Self {
Hash(h)
}
}
#[uniffi::export]
impl Hash {
#[uniffi::constructor]
pub fn new(buf: Vec<u8>) -> Self {
Hash(iroh_blobs::Hash::new(buf))
}
pub fn to_bytes(&self) -> Vec<u8> {
self.0.as_bytes().to_vec()
}
#[uniffi::constructor]
pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, IrohError> {
let bytes: [u8; 32] = bytes.try_into().map_err(|b: Vec<u8>| {
anyhow::anyhow!("expected byte array of length 32, got {}", b.len())
})?;
Ok(Hash(iroh_blobs::Hash::from_bytes(bytes)))
}
#[uniffi::constructor]
pub fn from_string(s: String) -> Result<Self, IrohError> {
let key = iroh_blobs::Hash::from_str(&s).map_err(anyhow::Error::from)?;
Ok(key.into())
}
pub fn to_hex(&self) -> String {
self.0.to_hex()
}
pub fn equal(&self, other: &Hash) -> bool {
*self == *other
}
}
impl std::fmt::Display for Hash {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<Hash> for iroh_blobs::Hash {
fn from(value: Hash) -> Self {
value.0
}
}
#[uniffi::export(with_foreign)]
#[async_trait::async_trait]
pub trait BlobProvideEventCallback: Send + Sync + 'static {
async fn blob_event(&self, event: Arc<BlobProvideEvent>) -> Result<(), CallbackError>;
}
#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, uniffi::Enum)]
pub enum BlobProvideEventType {
TaggedBlobAdded,
ClientConnected,
GetRequestReceived,
TransferHashSeqStarted,
TransferProgress,
TransferBlobCompleted,
TransferCompleted,
TransferAborted,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct TaggedBlobAdded {
pub hash: Arc<Hash>,
pub format: BlobFormat,
pub tag: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct ClientConnected {
pub connection_id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct GetRequestReceived {
pub connection_id: u64,
pub request_id: u64,
pub hash: Arc<Hash>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct TransferHashSeqStarted {
pub connection_id: u64,
pub request_id: u64,
pub num_blobs: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct TransferProgress {
pub connection_id: u64,
pub request_id: u64,
pub hash: Arc<Hash>,
pub end_offset: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct TransferBlobCompleted {
pub connection_id: u64,
pub request_id: u64,
pub hash: Arc<Hash>,
pub index: u64,
pub size: u64,
}
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct TransferCompleted {
pub connection_id: u64,
pub request_id: u64,
pub stats: TransferStats,
}
#[derive(Debug, Clone, PartialEq, uniffi::Record)]
pub struct TransferAborted {
pub connection_id: u64,
pub request_id: u64,
pub stats: Option<TransferStats>,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, uniffi::Record)]
pub struct TransferStats {
pub duration: u64,
}
impl From<&iroh_blobs::provider::TransferStats> for TransferStats {
fn from(value: &iroh_blobs::provider::TransferStats) -> Self {
Self {
duration: value
.duration
.as_millis()
.try_into()
.expect("duration too large"),
}
}
}
#[derive(Debug, Clone, PartialEq, uniffi::Object)]
pub enum BlobProvideEvent {
TaggedBlobAdded(TaggedBlobAdded),
ClientConnected(ClientConnected),
GetRequestReceived(GetRequestReceived),
TransferHashSeqStarted(TransferHashSeqStarted),
TransferProgress(TransferProgress),
TransferBlobCompleted(TransferBlobCompleted),
TransferCompleted(TransferCompleted),
TransferAborted(TransferAborted),
}
impl From<iroh_blobs::provider::Event> for BlobProvideEvent {
fn from(value: iroh_blobs::provider::Event) -> Self {
match value {
iroh_blobs::provider::Event::TaggedBlobAdded { hash, format, tag } => {
BlobProvideEvent::TaggedBlobAdded(TaggedBlobAdded {
hash: Arc::new(hash.into()),
format: format.into(),
tag: tag.0.as_ref().to_vec(),
})
}
iroh_blobs::provider::Event::ClientConnected { connection_id } => {
BlobProvideEvent::ClientConnected(ClientConnected { connection_id })
}
iroh_blobs::provider::Event::GetRequestReceived {
connection_id,
request_id,
hash,
} => BlobProvideEvent::GetRequestReceived(GetRequestReceived {
connection_id,
request_id,
hash: Arc::new(hash.into()),
}),
iroh_blobs::provider::Event::TransferHashSeqStarted {
connection_id,
request_id,
num_blobs,
} => BlobProvideEvent::TransferHashSeqStarted(TransferHashSeqStarted {
connection_id,
request_id,
num_blobs,
}),
iroh_blobs::provider::Event::TransferProgress {
connection_id,
request_id,
hash,
end_offset,
} => BlobProvideEvent::TransferProgress(TransferProgress {
connection_id,
request_id,
hash: Arc::new(hash.into()),
end_offset,
}),
iroh_blobs::provider::Event::TransferBlobCompleted {
connection_id,
request_id,
hash,
index,
size,
} => BlobProvideEvent::TransferBlobCompleted(TransferBlobCompleted {
connection_id,
request_id,
hash: Arc::new(hash.into()),
index,
size,
}),
iroh_blobs::provider::Event::TransferCompleted {
connection_id,
request_id,
stats,
} => BlobProvideEvent::TransferCompleted(TransferCompleted {
connection_id,
request_id,
stats: stats.as_ref().into(),
}),
iroh_blobs::provider::Event::TransferAborted {
connection_id,
request_id,
stats,
} => BlobProvideEvent::TransferAborted(TransferAborted {
connection_id,
request_id,
stats: stats.map(|s| s.as_ref().into()),
}),
}
}
}
#[uniffi::export]
impl BlobProvideEvent {
pub fn r#type(&self) -> BlobProvideEventType {
match self {
BlobProvideEvent::TaggedBlobAdded(_) => BlobProvideEventType::TaggedBlobAdded,
BlobProvideEvent::ClientConnected(_) => BlobProvideEventType::ClientConnected,
BlobProvideEvent::GetRequestReceived(_) => BlobProvideEventType::GetRequestReceived,
BlobProvideEvent::TransferHashSeqStarted(_) => {
BlobProvideEventType::TransferHashSeqStarted
}
BlobProvideEvent::TransferProgress(_) => BlobProvideEventType::TransferProgress,
BlobProvideEvent::TransferBlobCompleted(_) => {
BlobProvideEventType::TransferBlobCompleted
}
BlobProvideEvent::TransferCompleted(_) => BlobProvideEventType::TransferCompleted,
BlobProvideEvent::TransferAborted(_) => BlobProvideEventType::TransferAborted,
}
}
pub fn as_tagged_blob_added(&self) -> TaggedBlobAdded {
match self {
BlobProvideEvent::TaggedBlobAdded(t) => t.clone(),
_ => panic!("BlobProvideEvent type is not 'TaggedBlobAdded'"),
}
}
pub fn as_client_connected(&self) -> ClientConnected {
match self {
BlobProvideEvent::ClientConnected(c) => c.clone(),
_ => panic!("BlobProvideEvent type is not 'ClientConnected'"),
}
}
pub fn as_get_request_received(&self) -> GetRequestReceived {
match self {
BlobProvideEvent::GetRequestReceived(g) => g.clone(),
_ => panic!("BlobProvideEvent type is not 'GetRequestReceived'"),
}
}
pub fn as_transfer_hash_seq_started(&self) -> TransferHashSeqStarted {
match self {
BlobProvideEvent::TransferHashSeqStarted(t) => t.clone(),
_ => panic!("BlobProvideEvent type is not 'TransferHashSeqStarted'"),
}
}
pub fn as_transfer_progress(&self) -> TransferProgress {
match self {
BlobProvideEvent::TransferProgress(t) => t.clone(),
_ => panic!("BlobProvideEvent type is not 'TransferProgress'"),
}
}
pub fn as_transfer_blob_completed(&self) -> TransferBlobCompleted {
match self {
BlobProvideEvent::TransferBlobCompleted(t) => t.clone(),
_ => panic!("BlobProvideEvent type is not 'TransferBlobCompleted'"),
}
}
pub fn as_transfer_completed(&self) -> TransferCompleted {
match self {
BlobProvideEvent::TransferCompleted(t) => t.clone(),
_ => panic!("BlobProvideEvent type is not 'TransferCompleted'"),
}
}
pub fn as_transfer_aborted(&self) -> TransferAborted {
match self {
BlobProvideEvent::TransferAborted(t) => t.clone(),
_ => panic!("BlobProvideEvent type is not 'TransferAborted'"),
}
}
}
#[uniffi::export(with_foreign)]
#[async_trait::async_trait]
pub trait AddCallback: Send + Sync + 'static {
async fn progress(&self, progress: Arc<AddProgress>) -> Result<(), CallbackError>;
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize, Clone, uniffi::Enum)]
pub enum AddProgressType {
Found,
Progress,
Done,
AllDone,
Abort,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct AddProgressFound {
pub id: u64,
pub name: String,
pub size: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct AddProgressProgress {
pub id: u64,
pub offset: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct AddProgressDone {
pub id: u64,
pub hash: Arc<Hash>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct AddProgressAllDone {
pub hash: Arc<Hash>,
pub format: BlobFormat,
pub tag: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct AddProgressAbort {
pub error: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
pub enum AddProgress {
Found(AddProgressFound),
Progress(AddProgressProgress),
Done(AddProgressDone),
AllDone(AddProgressAllDone),
Abort(AddProgressAbort),
}
impl From<iroh_blobs::provider::AddProgress> for AddProgress {
fn from(value: iroh_blobs::provider::AddProgress) -> Self {
match value {
iroh_blobs::provider::AddProgress::Found { id, name, size } => {
AddProgress::Found(AddProgressFound { id, name, size })
}
iroh_blobs::provider::AddProgress::Progress { id, offset } => {
AddProgress::Progress(AddProgressProgress { id, offset })
}
iroh_blobs::provider::AddProgress::Done { id, hash } => {
AddProgress::Done(AddProgressDone {
id,
hash: Arc::new(hash.into()),
})
}
iroh_blobs::provider::AddProgress::AllDone { hash, format, tag } => {
AddProgress::AllDone(AddProgressAllDone {
hash: Arc::new(hash.into()),
format: format.into(),
tag: tag.0.to_vec(),
})
}
iroh_blobs::provider::AddProgress::Abort(err) => AddProgress::Abort(AddProgressAbort {
error: err.to_string(),
}),
}
}
}
#[uniffi::export]
impl AddProgress {
pub fn r#type(&self) -> AddProgressType {
match self {
AddProgress::Found(_) => AddProgressType::Found,
AddProgress::Progress(_) => AddProgressType::Progress,
AddProgress::Done(_) => AddProgressType::Done,
AddProgress::AllDone(_) => AddProgressType::AllDone,
AddProgress::Abort(_) => AddProgressType::Abort,
}
}
pub fn as_found(&self) -> AddProgressFound {
match self {
AddProgress::Found(f) => f.clone(),
_ => panic!("AddProgress type is not 'Found'"),
}
}
pub fn as_progress(&self) -> AddProgressProgress {
match self {
AddProgress::Progress(p) => p.clone(),
_ => panic!("AddProgress type is not 'Progress'"),
}
}
pub fn as_done(&self) -> AddProgressDone {
match self {
AddProgress::Done(d) => d.clone(),
_ => panic!("AddProgress type is not 'Done'"),
}
}
pub fn as_all_done(&self) -> AddProgressAllDone {
match self {
AddProgress::AllDone(a) => a.clone(),
_ => panic!("AddProgress type is not 'AllDone'"),
}
}
pub fn as_abort(&self) -> AddProgressAbort {
match self {
AddProgress::Abort(a) => a.clone(),
_ => panic!("AddProgress type is not 'Abort'"),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, uniffi::Enum)]
pub enum BlobFormat {
Raw,
HashSeq,
}
impl From<iroh_blobs::BlobFormat> for BlobFormat {
fn from(value: iroh_blobs::BlobFormat) -> Self {
match value {
iroh_blobs::BlobFormat::Raw => BlobFormat::Raw,
iroh_blobs::BlobFormat::HashSeq => BlobFormat::HashSeq,
}
}
}
impl From<BlobFormat> for iroh_blobs::BlobFormat {
fn from(value: BlobFormat) -> Self {
match value {
BlobFormat::Raw => iroh_blobs::BlobFormat::Raw,
BlobFormat::HashSeq => iroh_blobs::BlobFormat::HashSeq,
}
}
}
#[derive(Debug, uniffi::Object)]
pub struct BlobDownloadOptions(iroh_blobs::rpc::client::blobs::DownloadOptions);
#[uniffi::export]
impl BlobDownloadOptions {
#[uniffi::constructor]
pub fn new(
format: BlobFormat,
nodes: Vec<Arc<NodeAddr>>,
tag: Arc<SetTagOption>,
) -> Result<Self, IrohError> {
Ok(BlobDownloadOptions(
iroh_blobs::rpc::client::blobs::DownloadOptions {
format: format.into(),
nodes: nodes
.into_iter()
.map(|node| (*node).clone().try_into())
.collect::<Result<_, _>>()?,
tag: (*tag).clone().into(),
mode: iroh_blobs::rpc::client::blobs::DownloadMode::Direct,
},
))
}
}
impl From<iroh_blobs::rpc::client::blobs::DownloadOptions> for BlobDownloadOptions {
fn from(value: iroh_blobs::rpc::client::blobs::DownloadOptions) -> Self {
BlobDownloadOptions(value)
}
}
#[derive(Debug, uniffi::Enum)]
pub enum BlobExportFormat {
Blob,
Collection,
}
impl From<BlobExportFormat> for iroh_blobs::store::ExportFormat {
fn from(value: BlobExportFormat) -> Self {
match value {
BlobExportFormat::Blob => iroh_blobs::store::ExportFormat::Blob,
BlobExportFormat::Collection => iroh_blobs::store::ExportFormat::Collection,
}
}
}
#[derive(Debug, uniffi::Enum)]
pub enum BlobExportMode {
Copy,
TryReference,
}
impl From<BlobExportMode> for iroh_blobs::store::ExportMode {
fn from(value: BlobExportMode) -> Self {
match value {
BlobExportMode::Copy => iroh_blobs::store::ExportMode::Copy,
BlobExportMode::TryReference => iroh_blobs::store::ExportMode::TryReference,
}
}
}
#[uniffi::export(with_foreign)]
#[async_trait::async_trait]
pub trait DownloadCallback: Send + Sync + 'static {
async fn progress(&self, progress: Arc<DownloadProgress>) -> Result<(), CallbackError>;
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Enum)]
pub enum DownloadProgressType {
InitialState,
FoundLocal,
Connected,
Found,
FoundHashSeq,
Progress,
Done,
AllDone,
Abort,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressFound {
pub id: u64,
pub child: u64,
pub hash: Arc<Hash>,
pub size: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressFoundLocal {
pub child: u64,
pub hash: Arc<Hash>,
pub size: u64,
pub valid_ranges: Arc<RangeSpec>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressFoundHashSeq {
pub children: u64,
pub hash: Arc<Hash>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressProgress {
pub id: u64,
pub offset: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressDone {
pub id: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressAllDone {
pub bytes_written: u64,
pub bytes_read: u64,
pub elapsed: Duration,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressAbort {
pub error: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Record)]
pub struct DownloadProgressInitialState {
pub connected: bool,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
pub enum DownloadProgress {
InitialState(DownloadProgressInitialState),
Connected,
Found(DownloadProgressFound),
FoundLocal(DownloadProgressFoundLocal),
FoundHashSeq(DownloadProgressFoundHashSeq),
Progress(DownloadProgressProgress),
Done(DownloadProgressDone),
AllDone(DownloadProgressAllDone),
Abort(DownloadProgressAbort),
}
impl From<iroh_blobs::get::db::DownloadProgress> for DownloadProgress {
fn from(value: iroh_blobs::get::db::DownloadProgress) -> Self {
match value {
iroh_blobs::get::db::DownloadProgress::InitialState(transfer_state) => {
DownloadProgress::InitialState(DownloadProgressInitialState {
connected: transfer_state.connected,
})
}
iroh_blobs::get::db::DownloadProgress::FoundLocal {
child,
hash,
size,
valid_ranges,
} => DownloadProgress::FoundLocal(DownloadProgressFoundLocal {
child: child.into(),
hash: Arc::new(hash.into()),
size: size.value(),
valid_ranges: Arc::new(valid_ranges.into()),
}),
iroh_blobs::get::db::DownloadProgress::Connected => DownloadProgress::Connected,
iroh_blobs::get::db::DownloadProgress::Found {
id,
hash,
child,
size,
} => DownloadProgress::Found(DownloadProgressFound {
id,
hash: Arc::new(hash.into()),
child: child.into(),
size,
}),
iroh_blobs::get::db::DownloadProgress::FoundHashSeq { hash, children } => {
DownloadProgress::FoundHashSeq(DownloadProgressFoundHashSeq {
hash: Arc::new(hash.into()),
children,
})
}
iroh_blobs::get::db::DownloadProgress::Progress { id, offset } => {
DownloadProgress::Progress(DownloadProgressProgress { id, offset })
}
iroh_blobs::get::db::DownloadProgress::Done { id } => {
DownloadProgress::Done(DownloadProgressDone { id })
}
iroh_blobs::get::db::DownloadProgress::AllDone(stats) => {
DownloadProgress::AllDone(DownloadProgressAllDone {
bytes_written: stats.bytes_written,
bytes_read: stats.bytes_read,
elapsed: stats.elapsed,
})
}
iroh_blobs::get::db::DownloadProgress::Abort(err) => {
DownloadProgress::Abort(DownloadProgressAbort {
error: err.to_string(),
})
}
}
}
}
#[uniffi::export]
impl DownloadProgress {
pub fn r#type(&self) -> DownloadProgressType {
match self {
DownloadProgress::InitialState(_) => DownloadProgressType::InitialState,
DownloadProgress::Connected => DownloadProgressType::Connected,
DownloadProgress::Found(_) => DownloadProgressType::Found,
DownloadProgress::FoundLocal(_) => DownloadProgressType::FoundLocal,
DownloadProgress::FoundHashSeq(_) => DownloadProgressType::FoundHashSeq,
DownloadProgress::Progress(_) => DownloadProgressType::Progress,
DownloadProgress::Done(_) => DownloadProgressType::Done,
DownloadProgress::AllDone(_) => DownloadProgressType::AllDone,
DownloadProgress::Abort(_) => DownloadProgressType::Abort,
}
}
pub fn as_found(&self) -> DownloadProgressFound {
match self {
DownloadProgress::Found(f) => f.clone(),
_ => panic!("DownloadProgress type is not 'Found'"),
}
}
pub fn as_found_local(&self) -> DownloadProgressFoundLocal {
match self {
DownloadProgress::FoundLocal(f) => f.clone(),
_ => panic!("DownloadProgress type is not 'FoundLocal'"),
}
}
pub fn as_found_hash_seq(&self) -> DownloadProgressFoundHashSeq {
match self {
DownloadProgress::FoundHashSeq(f) => f.clone(),
_ => panic!("DownloadProgress type is not 'FoundHashSeq'"),
}
}
pub fn as_progress(&self) -> DownloadProgressProgress {
match self {
DownloadProgress::Progress(p) => p.clone(),
_ => panic!("DownloadProgress type is not 'Progress'"),
}
}
pub fn as_done(&self) -> DownloadProgressDone {
match self {
DownloadProgress::Done(d) => d.clone(),
_ => panic!("DownloadProgress type is not 'Done'"),
}
}
pub fn as_all_done(&self) -> DownloadProgressAllDone {
match self {
DownloadProgress::AllDone(e) => e.clone(),
_ => panic!("DownloadProgress type is not 'AllDone'"),
}
}
pub fn as_abort(&self) -> DownloadProgressAbort {
match self {
DownloadProgress::Abort(a) => a.clone(),
_ => panic!("DownloadProgress type is not 'Abort'"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, uniffi::Object)]
pub struct RangeSpec(pub(crate) iroh_blobs::protocol::RangeSpec);
#[uniffi::export]
impl RangeSpec {
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
pub fn is_all(&self) -> bool {
self.0.is_all()
}
}
impl From<iroh_blobs::protocol::RangeSpec> for RangeSpec {
fn from(h: iroh_blobs::protocol::RangeSpec) -> Self {
RangeSpec(h)
}
}
#[derive(Debug, Clone, uniffi::Record)]
pub struct BlobInfo {
pub path: String,
pub hash: Arc<Hash>,
pub size: u64,
}
impl From<iroh_blobs::rpc::client::blobs::BlobInfo> for BlobInfo {
fn from(value: iroh_blobs::rpc::client::blobs::BlobInfo) -> Self {
BlobInfo {
path: value.path,
hash: Arc::new(value.hash.into()),
size: value.size,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, uniffi::Record)]
pub struct IncompleteBlobInfo {
pub size: u64,
pub expected_size: u64,
pub hash: Arc<Hash>,
}
impl From<iroh_blobs::rpc::client::blobs::IncompleteBlobInfo> for IncompleteBlobInfo {
fn from(value: iroh_blobs::rpc::client::blobs::IncompleteBlobInfo) -> Self {
IncompleteBlobInfo {
size: value.size,
expected_size: value.expected_size,
hash: Arc::new(value.hash.into()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, uniffi::Record)]
pub struct CollectionInfo {
pub tag: Vec<u8>,
pub hash: Arc<Hash>,
pub total_blobs_count: Option<u64>,
pub total_blobs_size: Option<u64>,
}
impl From<iroh_blobs::rpc::client::blobs::CollectionInfo> for CollectionInfo {
fn from(value: iroh_blobs::rpc::client::blobs::CollectionInfo) -> Self {
CollectionInfo {
tag: value.tag.0.to_vec(),
hash: Arc::new(value.hash.into()),
total_blobs_count: value.total_blobs_count,
total_blobs_size: value.total_blobs_size,
}
}
}
#[derive(Debug, uniffi::Object)]
pub struct Collection(pub(crate) RwLock<iroh_blobs::format::collection::Collection>);
impl From<iroh_blobs::format::collection::Collection> for Collection {
fn from(value: iroh_blobs::format::collection::Collection) -> Self {
Collection(RwLock::new(value))
}
}
impl From<Collection> for iroh_blobs::format::collection::Collection {
fn from(value: Collection) -> Self {
let col = value.0.read().expect("Collection lock poisoned");
col.clone()
}
}
#[uniffi::export]
impl Collection {
#[allow(clippy::new_without_default)]
#[uniffi::constructor]
pub fn new() -> Self {
Collection(RwLock::new(
iroh_blobs::format::collection::Collection::default(),
))
}
pub fn push(&self, name: String, hash: &Hash) -> Result<(), IrohError> {
self.0.write().unwrap().push(name, hash.0);
Ok(())
}
pub fn is_empty(&self) -> Result<bool, IrohError> {
Ok(self.0.read().unwrap().is_empty())
}
pub fn names(&self) -> Result<Vec<String>, IrohError> {
Ok(self
.0
.read()
.unwrap()
.iter()
.map(|(name, _)| name.clone())
.collect())
}
pub fn links(&self) -> Result<Vec<Arc<Hash>>, IrohError> {
Ok(self
.0
.read()
.unwrap()
.iter()
.map(|(_, hash)| Arc::new(Hash(*hash)))
.collect())
}
pub fn blobs(&self) -> Result<Vec<LinkAndName>, IrohError> {
Ok(self
.0
.read()
.unwrap()
.iter()
.map(|(name, hash)| LinkAndName {
name: name.clone(),
link: Arc::new(Hash(*hash)),
})
.collect())
}
pub fn len(&self) -> Result<u64, IrohError> {
Ok(self.0.read().unwrap().len() as _)
}
}
#[derive(Clone, Debug, uniffi::Record)]
pub struct LinkAndName {
pub name: String,
pub link: Arc<Hash>,
}
#[cfg(test)]
mod tests {
use std::io::Write;
use std::sync::{Arc, Mutex};
use super::*;
use crate::node::Iroh;
use crate::{setup_logging, CallbackError, NodeOptions};
use rand::RngCore;
#[test]
fn test_hash() {
let hash_str = "6vp273v6cqbbq7xesa2xfrdt3oajykgeifprn3pj4p6y76654amq";
let hex_str = "f55fafeebe1402187ee4903572c473db809c28c4415f16ede9e3fd8ffbdde019";
let bytes = b"\xf5\x5f\xaf\xee\xbe\x14\x02\x18\x7e\xe4\x90\x35\x72\xc4\x73\xdb\x80\x9c\x28\xc4\x41\x5f\x16\xed\xe9\xe3\xfd\x8f\xfb\xdd\xe0\x19".to_vec();
let hash = Hash::from_string(hash_str.into()).unwrap();
assert_eq!(hash_str.to_string(), hash.to_string());
assert_eq!(bytes.to_vec(), hash.to_bytes());
assert_eq!(hex_str.to_string(), hash.to_hex());
let hash_0 = Hash::from_bytes(bytes.clone()).unwrap();
assert_eq!(hash_str.to_string(), hash_0.to_string());
assert_eq!(bytes, hash_0.to_bytes());
assert_eq!(hex_str.to_string(), hash_0.to_hex());
assert!(hash.equal(&hash_0));
assert!(hash_0.equal(&hash));
}
#[tokio::test]
async fn test_blobs_add_get_bytes() {
let dir = tempfile::tempdir().unwrap();
let node = Iroh::persistent(dir.keep().display().to_string())
.await
.unwrap();
let sizes = [1, 10, 100, 1000, 10000, 100000];
let mut hashes = Vec::new();
for size in sizes.iter() {
let hash = blobs_add_get_bytes_size(&node, *size);
hashes.push(hash)
}
}
async fn blobs_add_get_bytes_size(node: &Iroh, size: usize) -> Arc<Hash> {
let mut bytes = vec![0; size];
rand::thread_rng().fill_bytes(&mut bytes);
let add_outcome = node.blobs().add_bytes(bytes.to_vec()).await.unwrap();
assert_eq!(add_outcome.format, BlobFormat::Raw);
assert_eq!(add_outcome.size, size as u64);
let hash = add_outcome.hash;
let got_size = node.blobs().size(&hash).await.unwrap();
assert_eq!(got_size, size as u64);
let got_bytes = node.blobs().read_to_bytes(hash.clone()).await.unwrap();
assert_eq!(got_bytes.len(), size);
assert_eq!(got_bytes, bytes);
hash
}
#[tokio::test]
async fn test_blob_read_write_path() {
let iroh_dir = tempfile::tempdir().unwrap();
let node = Iroh::persistent(iroh_dir.keep().display().to_string())
.await
.unwrap();
let blob_size = 100;
let mut bytes = vec![0; blob_size];
rand::thread_rng().fill_bytes(&mut bytes);
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("in");
let mut file = std::fs::File::create(path.clone()).unwrap();
file.write_all(&bytes).unwrap();
let tag = SetTagOption::auto();
let wrap = WrapOption::no_wrap();
struct Output {
hash: Option<Arc<Hash>>,
format: Option<BlobFormat>,
}
let output = Arc::new(Mutex::new(Output {
hash: None,
format: None,
}));
struct Callback {
output: Arc<Mutex<Output>>,
}
#[async_trait::async_trait]
impl AddCallback for Callback {
async fn progress(&self, progress: Arc<AddProgress>) -> Result<(), CallbackError> {
match *progress {
AddProgress::AllDone(ref d) => {
let mut output = self.output.lock().unwrap();
output.hash = Some(d.hash.clone());
output.format = Some(d.format.clone());
}
AddProgress::Abort(ref _a) => {
return Err(CallbackError::Error);
}
_ => {}
}
Ok(())
}
}
let cb = Callback {
output: output.clone(),
};
node.blobs()
.add_from_path(
path.display().to_string(),
false,
Arc::new(tag),
Arc::new(wrap),
Arc::new(cb),
)
.await
.unwrap();
let (hash, format) = {
let output = output.lock().unwrap();
let hash = output.hash.as_ref().cloned().unwrap();
let format = output.format.as_ref().cloned().unwrap();
(hash, format)
};
assert_eq!(BlobFormat::Raw, format);
let got_size = node.blobs().size(&hash).await.unwrap();
assert_eq!(blob_size as u64, got_size);
let got_bytes = node.blobs().read_to_bytes(hash.clone()).await.unwrap();
assert_eq!(blob_size, got_bytes.len());
assert_eq!(bytes, got_bytes);
let out_path = dir.path().join("out");
node.blobs()
.write_to_path(hash, out_path.display().to_string())
.await
.unwrap();
let got_bytes = std::fs::read(out_path).unwrap();
assert_eq!(blob_size, got_bytes.len());
assert_eq!(bytes, got_bytes);
}
#[tokio::test]
async fn test_blobs_list_collections() {
let dir = tempfile::tempdir().unwrap();
let num_blobs = 3;
let blob_size = 100;
for i in 0..num_blobs {
let path = dir.path().join(i.to_string());
let mut file = std::fs::File::create(path).unwrap();
let mut bytes = vec![0; blob_size];
rand::thread_rng().fill_bytes(&mut bytes);
file.write_all(&bytes).unwrap()
}
let iroh_dir = tempfile::tempdir().unwrap();
let node = Iroh::persistent(iroh_dir.keep().display().to_string())
.await
.unwrap();
let blobs = node.blobs().list().await.unwrap();
assert!(blobs.is_empty());
struct Output {
collection_hash: Option<Arc<Hash>>,
format: Option<BlobFormat>,
blob_hashes: Vec<Arc<Hash>>,
}
let output = Arc::new(Mutex::new(Output {
collection_hash: None,
format: None,
blob_hashes: Vec::new(),
}));
struct Callback {
output: Arc<Mutex<Output>>,
}
#[async_trait::async_trait]
impl AddCallback for Callback {
async fn progress(&self, progress: Arc<AddProgress>) -> Result<(), CallbackError> {
match *progress {
AddProgress::AllDone(ref d) => {
let mut output = self.output.lock().unwrap();
output.collection_hash = Some(d.hash.clone());
output.format = Some(d.format.clone());
}
AddProgress::Abort(ref _a) => {
return Err(CallbackError::Error);
}
AddProgress::Done(ref d) => {
let mut output = self.output.lock().unwrap();
output.blob_hashes.push(d.hash.clone())
}
_ => {}
}
Ok(())
}
}
let cb = Callback {
output: output.clone(),
};
node.blobs()
.add_from_path(
dir.keep().display().to_string(),
false,
Arc::new(SetTagOption::Auto),
Arc::new(WrapOption::NoWrap),
Arc::new(cb),
)
.await
.unwrap();
let collections = node.blobs().list_collections().await.unwrap();
assert!(collections.len() == 1);
let (collection_hash, blob_hashes) = {
let output = output.lock().unwrap();
let collection_hash = output.collection_hash.as_ref().cloned().unwrap();
let mut blob_hashes = output.blob_hashes.clone();
blob_hashes.push(collection_hash.clone());
(collection_hash, blob_hashes)
};
assert_eq!(*(collections[0].hash), *collection_hash);
assert_eq!(
collections[0].total_blobs_count.unwrap(),
blob_hashes.len() as u64
);
let blobs = node.blobs().list().await.unwrap();
hashes_exist(&blob_hashes, &blobs);
println!("finished");
}
fn hashes_exist(expect: &Vec<Arc<Hash>>, got: &[Arc<Hash>]) {
for hash in expect {
if !got.contains(hash) {
panic!("expected to find hash {} in the list of hashes", hash);
}
}
}
#[tokio::test]
async fn test_list_and_delete() {
setup_logging();
let iroh_dir = tempfile::tempdir().unwrap();
let opts = NodeOptions {
gc_interval_millis: Some(50),
..Default::default()
};
let node = Iroh::persistent_with_options(iroh_dir.keep().display().to_string(), opts)
.await
.unwrap();
let blob_size = 100;
let mut blobs = vec![];
let num_blobs = 3;
for _i in 0..num_blobs {
let mut bytes = vec![0; blob_size];
rand::thread_rng().fill_bytes(&mut bytes);
blobs.push(bytes);
}
let mut hashes = vec![];
let mut tags = vec![];
for blob in blobs {
let output = node.blobs().add_bytes(blob).await.unwrap();
hashes.push(output.hash);
tags.push(output.tag);
}
let got_hashes = node.blobs().list().await.unwrap();
assert_eq!(num_blobs, got_hashes.len());
hashes_exist(&hashes, &got_hashes);
for hash in &got_hashes {
assert!(node.blobs().has(hash).await.unwrap());
}
let remove_hash = hashes.pop().unwrap();
let remove_tag = tags.pop().unwrap();
node.tags().delete(remove_tag).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let got_hashes = node.blobs().list().await.unwrap();
assert_eq!(num_blobs - 1, got_hashes.len());
hashes_exist(&hashes, &got_hashes);
for hash in got_hashes {
if remove_hash.equal(&hash) {
panic!("blob {} should have been removed", remove_hash);
}
}
assert!(!node.blobs().has(&remove_hash).await.unwrap());
}
}