use std::{
collections::HashMap,
pin::Pin,
task::{ready, Context, Poll},
};
use anyhow::{anyhow, Result};
use bytes::Bytes;
use futures_lite::{Stream, StreamExt};
use iroh::{EndpointAddr, EndpointId};
use iroh_blobs::Hash;
use serde::{Deserialize, Serialize};
use tokio_stream::{StreamMap, StreamNotifyClose};
use crate::{
form::{AuthForm, SubspaceForm, TimestampForm},
interest::{
AreaOfInterestSelector, CapSelector, CapabilityPack, DelegateTo, Interests, RestrictArea,
},
proto::{
data_model::{AuthorisedEntry, Path, SubspaceId},
grouping::{Area, Range3d},
keys::{NamespaceId, NamespaceKind, UserId},
meadowcap::{AccessMode, SecretKey},
},
rpc::proto::*,
session::{
intents::{serde_encoding::Event, Completion, IntentUpdate},
SessionInit, SessionMode,
},
store::traits::{StoreEvent, SubscribeParams},
};
pub type MemClient = Client;
#[derive(Debug, Clone)]
pub struct Client {
pub(super) rpc: irpc::Client<Request>,
}
impl Client {
pub fn new(rpc: irpc::Client<Request>) -> Self {
Self { rpc }
}
pub async fn create(&self, kind: NamespaceKind, owner: UserId) -> Result<Space> {
let req = CreateNamespaceRequest { kind, owner };
let res = self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(Space::new(self.rpc.clone(), res.0))
}
pub async fn create_user(&self) -> Result<UserId> {
let req = CreateUserRequest;
let res: RpcResult<CreateUserResponse> = self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))?;
let res = res?;
Ok(res.0)
}
pub async fn delegate_caps(
&self,
from: CapSelector,
access_mode: AccessMode,
to: DelegateTo,
) -> Result<Vec<CapabilityPack>> {
let req = DelegateCapsRequest {
from,
access_mode,
to,
};
let res = self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(res.0)
}
pub async fn import_caps(&self, caps: Vec<CapabilityPack>) -> Result<()> {
let req = ImportCapsRequest { caps };
self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(())
}
pub async fn import_and_sync(
&self,
ticket: SpaceTicket,
mode: SessionMode,
) -> Result<(Space, SyncHandleSet)> {
if ticket.caps.is_empty() {
anyhow::bail!("Invalid ticket: Does not include any capabilities");
}
let mut namespaces = ticket.caps.iter().map(|pack| pack.namespace());
let namespace = namespaces.next().expect("just checked");
if !namespaces.all(|n| n == namespace) {
anyhow::bail!("Invalid ticket: Capabilities do not all refer to the same namespace");
}
self.import_caps(ticket.caps).await?;
let interests = Interests::builder().add_full_cap(CapSelector::any(namespace));
let init = SessionInit::new(interests, mode);
let mut intents = SyncHandleSet::default();
for addr in ticket.nodes {
let node_id = addr.id;
self.add_addr(addr.clone()).await?;
let intent = self.sync_with_peer(node_id, init.clone()).await?;
intents.insert(node_id, intent)?;
}
let space = Space::new(self.rpc.clone(), namespace);
Ok((space, intents))
}
pub async fn sync_with_peer(&self, peer: EndpointId, init: SessionInit) -> Result<SyncHandle> {
let req = SyncWithPeerRequest { peer, init };
let (update_tx, mut event_rx) = self.rpc.bidi_streaming(req, 32, 32).await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let (mapped_tx, mapped_rx) = tokio::sync::mpsc::channel(32);
tokio::task::spawn(async move {
while let Ok(Some(res)) = event_rx.recv().await {
let res: RpcResult<SyncWithPeerResponse> = res;
let event = match res {
Ok(SyncWithPeerResponse::Event(event)) => event,
Ok(SyncWithPeerResponse::Started) => Event::ReconciledAll,
Err(e) => Event::Abort {
error: e.to_string(),
},
};
if mapped_tx.send(event).await.is_err() {
break;
}
}
});
let event_rx: EventReceiver = Box::pin(tokio_stream::wrappers::ReceiverStream::new(mapped_rx));
let (mapped_update_tx, mut mapped_update_rx) = tokio::sync::mpsc::channel::<IntentUpdate>(32);
tokio::task::spawn(async move {
while let Some(update) = mapped_update_rx.recv().await {
if update_tx.send(SyncWithPeerUpdate(update)).await.is_err() {
break;
}
}
});
let update_tx: UpdateSender = Box::pin(futures_util::sink::unfold(mapped_update_tx, |tx, item| async move {
tx.send(item).await.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(tx)
}));
Ok(SyncHandle::new(update_tx, event_rx, Default::default()))
}
pub async fn import_secret(&self, secret: impl Into<SecretKey>) -> Result<()> {
let req = InsertSecretRequest {
secret: secret.into(),
};
self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(())
}
pub async fn addr(&self) -> Result<EndpointAddr> {
let addr = self.rpc.rpc(AddrRequest).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(addr)
}
pub async fn add_addr(&self, addr: EndpointAddr) -> Result<()> {
self.rpc.rpc(AddAddrRequest { addr }).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct Space {
rpc: irpc::Client<Request>,
namespace_id: NamespaceId,
}
impl Space {
fn new(rpc: irpc::Client<Request>, namespace_id: NamespaceId) -> Self {
Self { rpc, namespace_id }
}
fn spaces(&self) -> Client {
Client { rpc: self.rpc.clone() }
}
pub fn namespace_id(&self) -> NamespaceId {
self.namespace_id
}
async fn insert(&self, entry: EntryForm, payload: PayloadForm) -> Result<InsertEntrySuccess> {
let form = FullEntryForm {
namespace_id: self.namespace_id,
subspace_id: entry.subspace_id,
path: entry.path,
timestamp: entry.timestamp,
payload,
};
let auth = entry.auth;
let req = InsertEntryRequest { entry: form, auth };
let res = self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(res)
}
pub async fn insert_hash(&self, entry: EntryForm, payload: Hash) -> Result<InsertEntrySuccess> {
let payload = PayloadForm::Checked(payload);
self.insert(entry, payload).await
}
pub async fn insert_bytes(
&self,
blobs: &iroh_blobs::api::Store,
entry: EntryForm,
payload: impl Into<Bytes>,
) -> Result<InsertEntrySuccess> {
let tag = blobs
.blobs()
.add_bytes(payload.into())
.temp_tag()
.await?;
self.insert_hash(entry, tag.hash()).await
}
pub async fn ingest(&self, authorised_entry: AuthorisedEntry) -> Result<()> {
let req = IngestEntryRequest { authorised_entry };
self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(())
}
pub async fn get_one(
&self,
subspace: SubspaceId,
path: Path,
) -> Result<Option<AuthorisedEntry>> {
let req = GetEntryRequest {
namespace: self.namespace_id,
subspace,
path,
};
let entry = self.rpc.rpc(req).await.map_err(|e| anyhow::anyhow!("{e}"))??;
Ok(entry.0.map(Into::into))
}
pub async fn get_many(
&self,
range: Range3d,
) -> Result<impl Stream<Item = Result<AuthorisedEntry>>> {
let req = GetEntriesRequest {
namespace: self.namespace_id,
range,
};
let rx = self.rpc.server_streaming(req, 64).await
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(rx.into_stream()
.map(|res| match res {
Ok(Ok(r)) => Ok(r.0),
Ok(Err(e)) => Err(anyhow::anyhow!("{e}")),
Err(e) => Err(anyhow::anyhow!("{e}")),
}))
}
pub async fn sync_once(
&self,
node: EndpointId,
areas: AreaOfInterestSelector,
) -> Result<SyncHandle> {
let cap = CapSelector::any(self.namespace_id);
let interests = Interests::builder().add(cap, areas);
let init = SessionInit::reconcile_once(interests);
self.spaces().sync_with_peer(node, init).await
}
pub async fn sync_continuously(
&self,
node: EndpointId,
areas: AreaOfInterestSelector,
) -> Result<SyncHandle> {
let cap = CapSelector::any(self.namespace_id);
let interests = Interests::builder().add(cap, areas);
let init = SessionInit::continuous(interests);
self.spaces().sync_with_peer(node, init).await
}
pub async fn share(
&self,
receiver: UserId,
access_mode: AccessMode,
restrict_area: RestrictArea,
) -> Result<SpaceTicket> {
let caps = self
.spaces()
.delegate_caps(
CapSelector::any(self.namespace_id),
access_mode,
DelegateTo::new(receiver, restrict_area),
)
.await?;
let node_addr = self.spaces().addr().await?;
Ok(SpaceTicket {
caps,
nodes: vec![node_addr],
})
}
pub async fn subscribe_area(
&self,
area: Area,
params: SubscribeParams,
) -> Result<impl Stream<Item = Result<StoreEvent>>> {
let req = SubscribeRequest {
namespace: self.namespace_id,
area,
params,
initial_progress_id: None,
};
let rx = self.rpc.server_streaming(req, 1024).await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let stream = rx.into_stream()
.map(|item| match item {
Ok(Ok(event)) => Ok(event),
Ok(Err(e)) => Err(anyhow::anyhow!("{e}")),
Err(e) => Err(anyhow::anyhow!("{e}")),
});
Ok(stream)
}
pub async fn resume_subscription(
&self,
progress_id: u64,
area: Area,
params: SubscribeParams,
) -> Result<impl Stream<Item = Result<StoreEvent>>> {
let req = SubscribeRequest {
namespace: self.namespace_id,
area,
params,
initial_progress_id: Some(progress_id),
};
let rx = self.rpc.server_streaming(req, 1024).await
.map_err(|e| anyhow::anyhow!("{e}"))?;
let stream = rx.into_stream()
.map(|item| match item {
Ok(Ok(event)) => Ok(event),
Ok(Err(e)) => Err(anyhow::anyhow!("{e}")),
Err(e) => Err(anyhow::anyhow!("{e}")),
});
Ok(stream)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SpaceTicket {
pub caps: Vec<CapabilityPack>,
pub nodes: Vec<EndpointAddr>,
}
#[derive(derive_more::Debug)]
pub struct SyncHandle {
#[debug("UpdateSender")]
update_tx: UpdateSender,
#[debug("EventReceiver")]
event_rx: EventReceiver,
state: SyncProgress,
}
pub type UpdateSender = Pin<Box<dyn futures_util::Sink<IntentUpdate, Error = anyhow::Error> + Send + 'static>>;
pub type EventReceiver = Pin<Box<dyn Stream<Item = Event> + Send + 'static>>;
impl SyncHandle {
fn new(update_tx: UpdateSender, event_rx: EventReceiver, state: SyncProgress) -> Self {
Self {
update_tx,
event_rx,
state,
}
}
pub fn split(self) -> (UpdateSender, EventReceiver) {
(self.update_tx, self.event_rx)
}
pub async fn complete(&mut self) -> Result<Completion> {
let mut state = SyncProgress::default();
while let Some(event) = self.event_rx.next().await {
state.handle_event(&event);
if state.is_ready() {
break;
}
}
state.into_completion()
}
pub async fn add_interests(&mut self, interests: impl Into<Interests>) -> Result<()> {
use futures_util::SinkExt;
self.update_tx
.send(IntentUpdate::AddInterests(interests.into()))
.await?;
Ok(())
}
}
impl Stream for SyncHandle {
type Item = Event;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(match ready!(Pin::new(&mut self.event_rx).poll_next(cx)) {
None => None,
Some(event) => {
self.state.handle_event(&event);
Some(event)
}
})
}
}
#[derive(Debug, Default)]
pub struct SyncProgress {
partial: bool,
complete: bool,
failed: Option<String>,
}
impl SyncProgress {
fn handle_event(&mut self, event: &Event) {
match event {
Event::ReconciledAll => self.complete = true,
Event::Reconciled { .. } => self.partial = true,
Event::Abort { error } => self.failed = Some(error.clone()),
_ => {}
}
}
fn is_ready(&self) -> bool {
self.complete || self.failed.is_some()
}
fn into_completion(self) -> Result<Completion> {
if let Some(error) = self.failed {
Err(anyhow!(error))
} else if self.complete {
Ok(Completion::Complete)
} else if self.partial {
Ok(Completion::Partial)
} else {
Ok(Completion::Nothing)
}
}
}
#[derive(Default, derive_more::Debug)]
#[debug("MergedSyncHandle({:?})", self.event_rx.keys().collect::<Vec<_>>())]
pub struct SyncHandleSet {
event_rx: StreamMap<EndpointId, StreamNotifyClose<EventReceiver>>,
intents: HashMap<EndpointId, HandleState>,
}
#[derive(derive_more::Debug)]
struct HandleState {
#[debug("UpdateSender")]
update_tx: UpdateSender,
state: SyncProgress,
}
impl SyncHandleSet {
pub fn insert(&mut self, peer: EndpointId, handle: SyncHandle) -> Result<(), IntentExistsError> {
if let std::collections::hash_map::Entry::Vacant(e) = self.intents.entry(peer) {
let SyncHandle {
update_tx,
event_rx,
state,
} = handle;
self.event_rx.insert(peer, StreamNotifyClose::new(event_rx));
e.insert(HandleState { update_tx, state });
Ok(())
} else {
Err(IntentExistsError(peer))
}
}
pub fn remove(&mut self, peer: &EndpointId) -> Option<SyncHandle> {
self.event_rx.remove(peer).and_then(|event_rx| {
self.intents.remove(peer).map(|state| {
SyncHandle::new(
state.update_tx,
event_rx.into_inner().expect("unreachable"),
state.state,
)
})
})
}
pub async fn add_interests(&mut self, interests: impl Into<Interests>) -> Result<()> {
let interests: Interests = interests.into();
let futs = self.intents.values_mut().map(|intent| {
use futures_util::SinkExt;
intent
.update_tx
.send(IntentUpdate::AddInterests(interests.clone()))
});
futures_buffered::try_join_all(futs).await?;
Ok(())
}
pub async fn complete_all(mut self) -> HashMap<EndpointId, Result<Completion>> {
let futs = self.intents.drain().map(|(node_id, state)| {
let event_rx = self
.event_rx
.remove(&node_id)
.expect("unreachable")
.into_inner()
.expect("unreachable");
async move {
let res = SyncHandle::new(state.update_tx, event_rx, state.state)
.complete()
.await;
(node_id, res)
}
});
let res = futures_buffered::join_all(futs).await;
res.into_iter().collect()
}
}
impl Stream for SyncHandleSet {
type Item = (EndpointId, Event);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match ready!(Pin::new(&mut self.event_rx).poll_next(cx)) {
None => break Poll::Ready(None),
Some((peer, Some(event))) => break Poll::Ready(Some((peer, event))),
Some((peer, None)) => {
self.intents.remove(&peer);
self.event_rx.remove(&peer);
continue;
}
}
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("The set already contains a sync intent for this peer.")]
pub struct IntentExistsError(pub EndpointId);
#[derive(Debug)]
pub struct EntryForm {
pub auth: AuthForm,
pub subspace_id: SubspaceForm,
pub path: Path,
pub timestamp: TimestampForm,
}
impl EntryForm {
pub fn new(user: UserId, path: Path) -> Self {
Self {
auth: AuthForm::Any(user),
path,
subspace_id: Default::default(),
timestamp: Default::default(),
}
}
}