use std::time::Duration;
use anyhow::Result;
use bytes::Bytes;
use futures_concurrency::future::TryJoin;
use futures_lite::StreamExt;
use iroh_willow::{
form::EntryForm,
interest::{CapSelector, DelegateTo, Interests, IntoAreaOfInterest, RestrictArea},
proto::{
data_model::{Path, PathExt},
grouping::{Area, AreaExt, Range3d},
keys::NamespaceKind,
},
session::{
intents::{Completion, EventKind},
SessionInit, SessionMode,
},
};
use meadowcap::AccessMode;
use self::util::{create_rng, insert, setup_and_delegate, spawn_two, Peer};
#[tokio::test(flavor = "multi_thread")]
async fn peer_manager_two_intents() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("peer_manager_two_intents");
let [alfie, betty] = spawn_two(&mut rng).await?;
let (namespace, _alfie_user, betty_user) = setup_and_delegate(&alfie, &betty).await?;
let betty_node_id = betty.node_id();
insert(&betty, namespace, betty_user, &[b"foo", b"1"], "foo 1").await?;
insert(&betty, namespace, betty_user, &[b"bar", b"2"], "bar 2").await?;
insert(&betty, namespace, betty_user, &[b"bar", b"3"], "bar 3").await?;
let task_foo_path = tokio::task::spawn({
let alfie = alfie.clone();
async move {
let path = Path::from_bytes(&[b"foo"]).unwrap();
let init = SessionInit::new(
Interests::builder().add_area(namespace, [Area::new_path(path.clone())]),
SessionMode::ReconcileOnce,
);
let mut intent = alfie.sync_with_peer(betty_node_id, init).await.unwrap();
assert_eq!(
intent.next().await.unwrap(),
EventKind::CapabilityIntersection {
namespace,
area: Area::new_full(),
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::InterestIntersection {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::Reconciled {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(intent.next().await.unwrap(), EventKind::ReconciledAll);
assert!(intent.next().await.is_none());
}
});
let task_bar_path = tokio::task::spawn({
let alfie = alfie.clone();
async move {
let path = Path::from_bytes(&[b"bar"]).unwrap();
let interests =
Interests::builder().add_area(namespace, [Area::new_path(path.clone())]);
let init = SessionInit::new(interests, SessionMode::ReconcileOnce);
let mut intent = alfie.sync_with_peer(betty_node_id, init).await.unwrap();
assert_eq!(
intent.next().await.unwrap(),
EventKind::CapabilityIntersection {
namespace,
area: Area::new_full(),
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::InterestIntersection {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::Reconciled {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(intent.next().await.unwrap(), EventKind::ReconciledAll);
assert!(intent.next().await.is_none());
}
});
task_foo_path.await.unwrap();
task_bar_path.await.unwrap();
[alfie, betty].map(Peer::shutdown).try_join().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn peer_manager_update_intent() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("peer_manager_update_intent");
let [alfie, betty] = spawn_two(&mut rng).await?;
let (namespace, _alfie_user, betty_user) = setup_and_delegate(&alfie, &betty).await?;
let betty_node_id = betty.node_id();
insert(&betty, namespace, betty_user, &[b"foo"], "foo 1").await?;
insert(&betty, namespace, betty_user, &[b"bar"], "bar 1").await?;
let path = Path::from_bytes(&[b"foo"]).unwrap();
let interests = Interests::builder().add_area(namespace, [Area::new_path(path.clone())]);
let init = SessionInit::new(interests, SessionMode::Continuous);
let mut intent = alfie.sync_with_peer(betty_node_id, init).await.unwrap();
assert_eq!(
intent.next().await.unwrap(),
EventKind::CapabilityIntersection {
namespace,
area: Area::new_full(),
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::InterestIntersection {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::Reconciled {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(intent.next().await.unwrap(), EventKind::ReconciledAll);
let path = Path::from_bytes(&[b"bar"]).unwrap();
let interests = Interests::builder().add_area(namespace, [Area::new_path(path.clone())]);
intent.add_interests(interests).await?;
assert_eq!(
intent.next().await.unwrap(),
EventKind::InterestIntersection {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::Reconciled {
namespace,
area: Area::new_path(path.clone()).into_area_of_interest()
}
);
assert_eq!(intent.next().await.unwrap(), EventKind::ReconciledAll);
intent.close().await;
assert!(intent.next().await.is_none());
[alfie, betty].map(Peer::shutdown).try_join().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn peer_manager_shutdown_immediate() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("peer_manager_shutdown_immediate");
let [alfie, betty] = spawn_two(&mut rng).await?;
let (_namespace, _alfie_user, _betty_user) = setup_and_delegate(&alfie, &betty).await?;
let betty_node_id = betty.node_id();
let mut intent = alfie
.sync_with_peer(betty_node_id, SessionInit::reconcile_once(Interests::all()))
.await?;
let completion = intent.complete().await?;
assert_eq!(completion, Completion::Complete);
[alfie, betty].map(Peer::shutdown).try_join().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn peer_manager_shutdown_timeout() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("peer_manager_shutdown_timeout");
let [alfie, betty] = spawn_two(&mut rng).await?;
let (_namespace, _alfie_user, _betty_user) = setup_and_delegate(&alfie, &betty).await?;
let betty_node_id = betty.node_id();
let mut intent = alfie
.sync_with_peer(betty_node_id, SessionInit::reconcile_once(Interests::all()))
.await?;
let completion = intent.complete().await?;
assert_eq!(completion, Completion::Complete);
tokio::time::sleep(Duration::from_secs(1)).await;
[alfie, betty].map(Peer::shutdown).try_join().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn peer_manager_twoway_loop() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("peer_manager_twoway_loop");
let [alfie, betty] = spawn_two(&mut rng).await?;
let (namespace, alfie_user, betty_user) = setup_and_delegate(&alfie, &betty).await?;
insert(&alfie, namespace, alfie_user, &[b"foo"], "foo 1").await?;
insert(&betty, namespace, betty_user, &[b"bar"], "bar 1").await?;
let alfie_node_id = alfie.node_id();
let betty_node_id = betty.node_id();
let rounds = 20;
for i in 0..rounds {
println!("\n\nROUND {i} of {rounds}\n\n");
let alfie = alfie.clone();
let betty = betty.clone();
let task_alfie = tokio::task::spawn(async move {
let mut intent = alfie
.sync_with_peer(betty_node_id, SessionInit::reconcile_once(Interests::all()))
.await
.unwrap();
let completion = intent.complete().await.expect("failed to complete intent");
assert_eq!(completion, Completion::Complete);
});
let task_betty = tokio::task::spawn(async move {
let mut intent = betty
.sync_with_peer(alfie_node_id, SessionInit::reconcile_once(Interests::all()))
.await
.unwrap();
let completion = intent.complete().await.expect("failed to complete intent");
assert_eq!(completion, Completion::Complete);
});
task_alfie.await.unwrap();
task_betty.await.unwrap();
}
[alfie, betty].map(Peer::shutdown).try_join().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn read_back_write() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("read_back_write");
let disco = iroh::address_lookup::memory::MemoryLookup::new();
let alfie = Peer::spawn(util::generate_secret_key(&mut rng), Default::default(), disco).await?;
let user_alfie = alfie.create_user().await?;
let namespace_id = alfie
.create_namespace(NamespaceKind::Owned, user_alfie)
.await?;
for i in 0u64..2 {
let path = Path::from_bytes(&[b"foo", &i.to_le_bytes()])?;
let entry = EntryForm::new_bytes(namespace_id, path, "foo");
alfie.insert_entry(entry, user_alfie).await?;
}
let entries: Vec<_> = alfie
.get_entries(namespace_id, Range3d::new_full())
.await?
.try_collect()
.await?;
println!("{entries:#?}");
assert_eq!(entries.len(), 2);
Ok(())
}
#[ignore = "flaky"]
#[tokio::test(flavor = "multi_thread")]
async fn owned_namespace_subspace_write_sync() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("owned_namespace_subspace_write_sync");
let [alfie, betty] = spawn_two(&mut rng).await?;
let user_alfie = alfie.create_user().await?;
let user_betty = betty.create_user().await?;
let namespace_id = alfie
.create_namespace(NamespaceKind::Owned, user_alfie)
.await?;
let restriction = RestrictArea::Restrict(Area::new_subspace(user_betty));
let cap_for_betty = alfie
.delegate_caps(
CapSelector::any(namespace_id),
AccessMode::Write,
DelegateTo::new(user_betty, restriction),
)
.await?;
betty.import_caps(cap_for_betty).await?;
let path = Path::from_bytes(&[b"foo"])?;
let entry = EntryForm::new_bytes(namespace_id, path, "foo");
betty.insert_entry(entry, user_betty).await?;
let path = Path::from_bytes(&[b"foo"])?;
let entry = EntryForm::new_bytes(namespace_id, path, "foo").subspace(user_alfie);
assert!(betty.insert_entry(entry, user_betty).await.is_err());
let init = SessionInit::new(
Interests::builder().add_full_cap(namespace_id),
SessionMode::ReconcileOnce,
);
let mut intent = alfie.sync_with_peer(betty.node_id(), init).await.unwrap();
let completion = intent.complete().await.expect("failed to complete intent");
assert_eq!(completion, Completion::Partial);
let entries: Vec<_> = alfie
.get_entries(namespace_id, Range3d::new_full())
.await?
.try_collect()
.await?;
assert_eq!(entries.len(), 1);
Ok(())
}
mod util {
use std::sync::{Arc, Mutex};
use anyhow::Result;
use bytes::Bytes;
use futures_concurrency::future::TryJoin;
use iroh::{Endpoint, EndpointId};
use iroh_willow::{
engine::{AcceptOpts, Engine},
form::EntryForm,
interest::{CapSelector, DelegateTo, RestrictArea},
proto::{
data_model::{Path, PathExt},
keys::{NamespaceId, NamespaceKind, UserId},
meadowcap::AccessMode,
},
ALPN,
};
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha12Rng;
use rand_core::CryptoRngCore;
use tokio::task::JoinHandle;
pub fn create_rng(seed: &str) -> ChaCha12Rng {
let seed = iroh_blobs::Hash::new(seed);
let seed_bytes: [u8; 32] = seed.as_bytes()[..32].try_into().unwrap();
ChaCha12Rng::from_seed(seed_bytes)
}
pub fn generate_secret_key(rng: &mut impl CryptoRngCore) -> iroh::SecretKey {
let bytes: [u8; 32] = rng.gen();
iroh::SecretKey::from_bytes(&bytes)
}
#[derive(Debug, Clone)]
pub struct Peer {
pub blobs: iroh_blobs::store::mem::MemStore,
endpoint: Endpoint,
engine: Engine,
accept_task: Arc<Mutex<Option<JoinHandle<Result<()>>>>>,
}
impl Peer {
pub async fn spawn(
secret_key: iroh::SecretKey,
accept_opts: AcceptOpts,
address_lookup: iroh::address_lookup::memory::MemoryLookup,
) -> Result<Self> {
let endpoint = Endpoint::builder()
.secret_key(secret_key)
.relay_mode(iroh::RelayMode::Disabled)
.address_lookup(address_lookup)
.alpns(vec![ALPN.to_vec()])
.bind()
.await?;
let blobs = iroh_blobs::store::mem::MemStore::default();
let payloads: iroh_blobs::api::Store = blobs.clone().into();
let create_store = move || iroh_willow::store::memory::Store::new(payloads);
let engine = Engine::spawn(endpoint.clone(), create_store, accept_opts);
let accept_task = tokio::task::spawn({
let engine = engine.clone();
let endpoint = endpoint.clone();
async move {
while let Some(incoming) = endpoint.accept().await {
let Ok(mut connecting) = incoming.accept() else {
continue;
};
let Ok(alpn) = connecting.alpn().await else {
continue;
};
if alpn != ALPN {
continue;
}
let Ok(conn) = connecting.await else {
continue;
};
engine.handle_connection(conn).await?;
}
Result::Ok(())
}
});
Ok(Self {
blobs,
endpoint,
engine,
accept_task: Arc::new(Mutex::new(Some(accept_task))),
})
}
pub async fn shutdown(self) -> Result<()> {
let accept_task = self.accept_task.lock().unwrap().take();
if let Some(accept_task) = accept_task {
accept_task.abort();
match accept_task.await {
Err(err) if err.is_cancelled() => {}
Ok(Ok(())) => {}
Err(err) => Err(err)?,
Ok(Err(err)) => Err(err)?,
}
}
self.engine.shutdown().await?;
self.endpoint.close().await;
Ok(())
}
pub fn node_id(&self) -> EndpointId {
self.endpoint.id()
}
}
impl std::ops::Deref for Peer {
type Target = Engine;
fn deref(&self) -> &Self::Target {
&self.engine
}
}
pub async fn spawn_two(mut rng: &mut impl CryptoRngCore) -> Result<[Peer; 2]> {
let disco = iroh::address_lookup::memory::MemoryLookup::new();
let peers = [
generate_secret_key(&mut rng),
generate_secret_key(&mut rng),
]
.map(|secret_key| Peer::spawn(secret_key, Default::default(), disco.clone()))
.try_join()
.await?;
disco.add_endpoint_info(peers[0].endpoint.addr());
disco.add_endpoint_info(peers[1].endpoint.addr());
Ok(peers)
}
pub async fn setup_and_delegate(
alfie: &Engine,
betty: &Engine,
) -> Result<(NamespaceId, UserId, UserId)> {
let user_alfie = alfie.create_user().await?;
let user_betty = betty.create_user().await?;
let namespace_id = alfie
.create_namespace(NamespaceKind::Owned, user_alfie)
.await?;
let cap_for_betty = alfie
.delegate_caps(
CapSelector::any(namespace_id),
AccessMode::Write,
DelegateTo::new(user_betty, RestrictArea::None),
)
.await?;
betty.import_caps(cap_for_betty).await?;
Ok((namespace_id, user_alfie, user_betty))
}
pub async fn insert(
handle: &Engine,
namespace_id: NamespaceId,
user: UserId,
path: &[&[u8]],
bytes: impl Into<Bytes>,
) -> Result<()> {
let path = Path::from_bytes(path)?;
let entry = EntryForm::new_bytes(namespace_id, path, bytes);
handle.insert_entry(entry, user).await?;
Ok(())
}
}
#[tokio::test(flavor = "multi_thread")]
async fn peer_manager_empty_payload() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("peer_manager_empty_payload");
let [alfie, betty] = spawn_two(&mut rng).await?;
let (namespace, _alfie_user, betty_user) = setup_and_delegate(&alfie, &betty).await?;
let betty_node_id = betty.node_id();
insert(&betty, namespace, betty_user, &[b"foo"], "").await?;
let init = SessionInit::new(Interests::all(), SessionMode::ReconcileOnce);
let mut intent = alfie.sync_with_peer(betty_node_id, init).await.unwrap();
assert_eq!(
intent.next().await.unwrap(),
EventKind::CapabilityIntersection {
namespace,
area: Area::new_full(),
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::InterestIntersection {
namespace,
area: Area::new_full().into_area_of_interest()
}
);
assert_eq!(
intent.next().await.unwrap(),
EventKind::Reconciled {
namespace,
area: Area::new_full().into_area_of_interest()
}
);
assert_eq!(intent.next().await.unwrap(), EventKind::ReconciledAll);
assert!(intent.next().await.is_none());
[alfie, betty].map(Peer::shutdown).try_join().await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn peer_manager_big_payload() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let mut rng = create_rng("peer_manager_empty_payload");
let [alfie, betty] = spawn_two(&mut rng).await?;
let (namespace, _alfie_user, betty_user) = setup_and_delegate(&alfie, &betty).await?;
let betty_node_id = betty.node_id();
let payload = Bytes::from(vec![2u8; 1024 * 128]);
insert(&betty, namespace, betty_user, &[b"foo"], payload.clone()).await?;
let init = SessionInit::new(Interests::all(), SessionMode::ReconcileOnce);
let mut intent = alfie.sync_with_peer(betty_node_id, init).await.unwrap();
intent.complete().await?;
let entries = alfie.get_entries(namespace, Range3d::new_full()).await?;
let entries: Vec<_> = entries.try_collect().await?;
assert_eq!(entries.len(), 1);
let entry = &entries[0];
let hash: iroh_blobs::Hash = (*entry.entry().payload_digest()).into();
let mut reader = alfie.blobs.reader(hash);
let mut actual = Vec::new();
tokio::io::AsyncReadExt::read_to_end(&mut reader, &mut actual).await?;
assert_eq!(actual.len(), payload.len());
assert!(actual == payload.as_ref());
[alfie, betty].map(Peer::shutdown).try_join().await?;
Ok(())
}