use std::{collections::BTreeMap, sync::Arc, time::Duration};
use anyhow::ensure;
use futures_lite::StreamExt;
use iroh::{Endpoint, EndpointAddr, SecretKey};
use iroh_willow::{
engine::AcceptOpts,
interest::{AreaOfInterestSelector, CapSelector, DelegateTo, RestrictArea},
proto::{
data_model::{Path, PathExt},
grouping::{Area, Range3d},
keys::{NamespaceKind, UserId},
meadowcap::AccessMode,
},
rpc::client::{Client, EntryForm, Space},
session::{intents::Completion, SessionMode},
store::traits::{EntryOrigin, StoreEvent},
Engine,
};
use proptest::{collection::vec, prelude::Strategy, sample::select};
use test_strategy::proptest;
use testresult::TestResult;
use tracing::{error, info};
async fn spawn_node(
persist_test_mode: bool,
disco: iroh::address_lookup::memory::MemoryLookup,
) -> (
EndpointAddr,
Client,
iroh_blobs::store::mem::MemStore,
iroh::protocol::Router,
) {
let blobs_store = iroh_blobs::store::mem::MemStore::default();
let bytes: [u8; 32] = rand::random();
let secret_key = SecretKey::from_bytes(&bytes);
let endpoint = Endpoint::builder()
.secret_key(secret_key)
.alpns(vec![iroh_willow::ALPN.to_vec()])
.relay_mode(iroh::RelayMode::Disabled)
.address_lookup(disco)
.bind()
.await
.unwrap();
let store: iroh_blobs::api::Store = blobs_store.clone().into();
let engine = if persist_test_mode {
Engine::spawn(
endpoint.clone(),
move || {
iroh_willow::store::persistent::Store::new_memory(store)
.expect("couldn't initialize store")
},
AcceptOpts::default(),
)
} else {
Engine::spawn(
endpoint.clone(),
move || iroh_willow::store::memory::Store::new(store),
AcceptOpts::default(),
)
};
let client = engine.client().clone();
let addr = endpoint.addr();
let router = iroh::protocol::Router::builder(endpoint.clone())
.accept(iroh_willow::ALPN, Arc::new(engine.clone()))
.spawn();
(addr, client, blobs_store, router)
}
#[derive(Debug, Clone)]
enum Operation {
Write(String, String),
}
fn simple_key() -> impl Strategy<Value = String> {
select(&["alpha", "beta", "gamma"]).prop_map(str::to_string)
}
fn simple_value() -> impl Strategy<Value = String> {
select(&["red", "blue", "green"]).prop_map(str::to_string)
}
fn simple_op() -> impl Strategy<Value = Operation> {
(simple_key(), simple_value()).prop_map(|(key, value)| Operation::Write(key, value))
}
fn role() -> impl Strategy<Value = Peer> {
select(&[Peer::X, Peer::Y])
}
#[derive(Debug, Eq, PartialEq, Clone, Copy, Ord, PartialOrd)]
enum Peer {
X,
Y,
}
#[proptest(cases = 32)]
fn prop_sync_simulation_matches_model(
x_is_persist: bool,
y_is_persist: bool,
#[strategy(vec((role(), vec(simple_op(), 0..20)), 0..20))] rounds: Vec<(Peer, Vec<Operation>)>,
) {
let _ = tracing_subscriber::fmt::try_init();
let res = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
let mut simulated_entries: BTreeMap<(Peer, String), String> = BTreeMap::new();
let disco = iroh::address_lookup::memory::MemoryLookup::new();
let (addr_x, iroh_x, blobs_x, _guard_x) = spawn_node(x_is_persist, disco.clone()).await;
let (addr_y, iroh_y, blobs_y, _guard_y) = spawn_node(y_is_persist, disco.clone()).await;
disco.add_endpoint_info(addr_x.clone());
disco.add_endpoint_info(addr_y.clone());
let node_id_x = addr_x.id;
let node_id_y = addr_y.id;
let user_x = iroh_x.create_user().await?;
let user_y = iroh_y.create_user().await?;
info!(
"X is node {} user {}",
node_id_x.fmt_short(),
user_x.fmt_short()
);
info!(
"Y is node {} user {}",
node_id_y.fmt_short(),
user_y.fmt_short()
);
let space_x = iroh_x.create(NamespaceKind::Owned, user_x).await?;
let ticket = space_x
.share(user_y, AccessMode::Write, RestrictArea::None)
.await?;
let (space_y, syncs) = iroh_y
.import_and_sync(ticket, SessionMode::ReconcileOnce)
.await?;
let mut completions = syncs.complete_all().await;
assert_eq!(completions.len(), 1);
let completion = completions.remove(&node_id_x).unwrap();
assert!(completion.is_ok());
assert_eq!(completion.unwrap(), Completion::Complete);
let count = rounds.len();
for (i, (peer, round)) in rounds.into_iter().enumerate() {
let i = i + 1;
let (space, blobs, user) = match peer {
Peer::X => (&space_x, &blobs_x, user_x),
Peer::Y => (&space_y, &blobs_y, user_y),
};
info!(active=?peer, "[{i}/{count}] round start");
for Operation::Write(key, value) in round {
info!(?key, ?value, "[{i}/{count}] write");
space
.insert_bytes(
blobs,
EntryForm::new(user, Path::from_bytes(&[key.as_bytes()])?),
value.clone().into_bytes(),
)
.await?;
simulated_entries.insert((peer, key), value);
}
let fut_x = async {
space_x
.sync_once(node_id_y, AreaOfInterestSelector::Widest)
.await?
.complete()
.await?;
anyhow::Ok(())
};
let fut_y = async {
space_y
.sync_once(node_id_x, AreaOfInterestSelector::Widest)
.await?
.complete()
.await?;
anyhow::Ok(())
};
let fut = async { tokio::try_join!(fut_x, fut_y) };
tokio::time::timeout(Duration::from_secs(40), fut).await??;
info!("[{i}/{count}] sync complete");
let map_x = space_to_map(&space_x, &blobs_x, user_x, user_y).await?;
let map_y = space_to_map(&space_y, &blobs_y, user_x, user_y).await?;
ensure!(
map_x == map_y,
"states out of sync:\n{map_x:#?}\n !=\n{map_y:#?}"
);
ensure!(
map_x == map_y,
"states out of sync:\n{map_x:#?}\n !=\n{map_y:#?}"
);
ensure!(
simulated_entries == map_x,
"alfie in unexpected state:\n{simulated_entries:#?}\n !=\n{map_x:#?}"
);
ensure!(
simulated_entries == map_y,
"betty in unexpected state:\n{simulated_entries:#?}\n !=\n{map_y:#?}"
);
}
info!("completed {count} rounds successfully");
Ok(())
});
if let Err(err) = &res {
error!(?err, "FAILED");
}
res.map_err(AnyhowStdErr)?;
}
async fn space_to_map(
space: &Space,
blobs: &iroh_blobs::store::mem::MemStore,
user_x: UserId,
user_y: UserId,
) -> anyhow::Result<BTreeMap<(Peer, String), String>> {
use tokio::io::AsyncReadExt;
let role_lookup = BTreeMap::from([(user_x, Peer::X), (user_y, Peer::Y)]);
let entries = space
.get_many(Range3d::new_full())
.await?
.try_collect::<_, _, Vec<_>>()
.await?;
let mut map: BTreeMap<(Peer, String), String> = BTreeMap::new();
for auth_entry in entries {
let (entry, auth) = auth_entry.into_parts();
let key_component = entry
.path()
.get_component(0)
.ok_or_else(|| anyhow::anyhow!("path component missing"))?;
let key = String::from_utf8(key_component.to_vec())?;
let hash = entry.payload_digest().0;
let mut reader = blobs.reader(hash);
let mut value = Vec::new();
reader.read_to_end(&mut value).await?;
let user = auth.capability.receiver();
let peer = role_lookup
.get(user)
.ok_or_else(|| anyhow::anyhow!("foreign write?"))?;
map.insert((*peer, key), String::from_utf8_lossy(&value).to_string());
}
Ok(map)
}
#[derive(Debug)]
struct AnyhowStdErr(anyhow::Error);
impl std::fmt::Display for AnyhowStdErr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.fmt(f)
}
}
impl std::error::Error for AnyhowStdErr {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.0.source()
}
fn description(&self) -> &str {
"description() is deprecated; use Display"
}
fn cause(&self) -> Option<&dyn std::error::Error> {
self.source()
}
}
#[tokio::test]
async fn spaces_smoke() -> TestResult {
let _ = tracing_subscriber::fmt::try_init();
let disco = iroh::address_lookup::memory::MemoryLookup::new();
let (alfie_addr, alfie, alfie_blobs, _g1) = spawn_node(false, disco.clone()).await;
let (betty_addr, betty, betty_blobs, _g2) = spawn_node(false, disco.clone()).await;
disco.add_endpoint_info(alfie_addr.clone());
disco.add_endpoint_info(betty_addr.clone());
info!("alfie is {}", alfie_addr.id.fmt_short());
info!("betty is {}", betty_addr.id.fmt_short());
let betty_user = betty.create_user().await?;
let alfie_user = alfie.create_user().await?;
let alfie_space = alfie.create(NamespaceKind::Owned, alfie_user).await?;
let namespace = alfie_space.namespace_id();
alfie_space
.insert_bytes(
&alfie_blobs,
EntryForm::new(alfie_user, Path::from_bytes(&[b"foo", b"bar"])?),
"hello betty",
)
.await?;
alfie_space
.insert_bytes(
&alfie_blobs,
EntryForm::new(alfie_user, Path::from_bytes(&[b"foo", b"boo"])?),
"this is alfie",
)
.await?;
let ticket = alfie_space
.share(betty_user, AccessMode::Read, RestrictArea::None)
.await?;
println!("ticket {ticket:?}");
let (betty_space, betty_sync_intent) = betty
.import_and_sync(ticket, SessionMode::ReconcileOnce)
.await?;
let mut completion = betty_sync_intent.complete_all().await;
assert_eq!(completion.len(), 1);
let alfie_completion = completion.remove(&alfie_addr.id).unwrap();
assert_eq!(alfie_completion?, Completion::Complete);
let betty_entries: Vec<_> = betty_space
.get_many(Range3d::new_full())
.await?
.try_collect()
.await?;
assert_eq!(betty_entries.len(), 2);
let res = betty_space
.insert_bytes(
&betty_blobs,
EntryForm::new(betty_user, Path::from_bytes(&[b"hello"])?),
"this is betty",
)
.await;
println!("insert without cap: {res:?}");
assert!(res.is_err());
let area = Area::new_subspace(betty_user);
let caps = alfie
.delegate_caps(
CapSelector::any(namespace),
AccessMode::Write,
DelegateTo::new(betty_user, RestrictArea::Restrict(area)),
)
.await?;
betty.import_caps(caps).await?;
let res = betty_space
.insert_bytes(
&betty_blobs,
EntryForm::new(betty_user, Path::from_bytes(&[b"hello"])?),
"this is betty",
)
.await;
assert!(res.is_ok());
let mut alfie_sync_intent = alfie_space
.sync_once(betty_addr.id, Default::default())
.await?;
alfie_sync_intent.complete().await?;
let alfie_entries: Vec<_> = alfie_space
.get_many(Range3d::new_full())
.await?
.try_collect()
.await?;
assert_eq!(alfie_entries.len(), 3);
Ok(())
}
#[tokio::test]
async fn spaces_subscription() -> TestResult {
let _ = tracing_subscriber::fmt::try_init();
let disco = iroh::address_lookup::memory::MemoryLookup::new();
let (alfie_addr, alfie, alfie_blobs, _g1) = spawn_node(false, disco.clone()).await;
let (betty_addr, betty, betty_blobs, _g2) = spawn_node(false, disco.clone()).await;
disco.add_endpoint_info(alfie_addr.clone());
disco.add_endpoint_info(betty_addr.clone());
info!("alfie is {}", alfie_addr.id.fmt_short());
info!("betty is {}", betty_addr.id.fmt_short());
let betty_user = betty.create_user().await?;
let alfie_user = alfie.create_user().await?;
let alfie_space = alfie.create(NamespaceKind::Owned, alfie_user).await?;
let _namespace = alfie_space.namespace_id();
let alfie_sub = alfie_space
.subscribe_area(Area::new_full(), Default::default())
.await?;
tokio::pin!(alfie_sub);
let ticket = alfie_space
.share(betty_user, AccessMode::Write, RestrictArea::None)
.await?;
let (betty_space, betty_sync_intent) = betty
.import_and_sync(ticket, SessionMode::Continuous)
.await?;
let _sync_task = tokio::task::spawn(async move {
let _ = betty_sync_intent.complete_all().await;
});
let betty_sub = betty_space
.resume_subscription(0, Area::new_full(), Default::default())
.await?;
tokio::pin!(betty_sub);
betty_space
.insert_bytes(
&betty_blobs,
EntryForm::new(betty_user, Path::from_bytes(&[b"foo"])?),
"hi",
)
.await?;
let ev = betty_sub.next().await.unwrap().unwrap();
println!("BETTY 1 {ev:?}");
assert!(matches!(ev, StoreEvent::Ingested(0, _, EntryOrigin::Local)));
let ev = alfie_sub.next().await.unwrap().unwrap();
println!("ALFIE 1 {ev:?}");
assert!(matches!(
ev,
StoreEvent::Ingested(0, _, EntryOrigin::Remote(_))
));
alfie_space
.insert_bytes(
&alfie_blobs,
EntryForm::new(alfie_user, Path::from_bytes(&[b"bar"])?),
"hi!!",
)
.await?;
let ev = alfie_sub.next().await.unwrap().unwrap();
println!("ALFIE 2 {ev:?}");
assert!(matches!(ev, StoreEvent::Ingested(1, _, EntryOrigin::Local)));
let ev = betty_sub.next().await.unwrap().unwrap();
println!("BETTY 2 {ev:?}");
assert!(matches!(
ev,
StoreEvent::Ingested(1, _, EntryOrigin::Remote(_))
));
Ok(())
}
#[ignore = "flaky"]
#[tokio::test]
async fn test_restricted_area() -> testresult::TestResult {
let _ = tracing_subscriber::fmt::try_init();
const TIMEOUT: Duration = Duration::from_secs(20);
let disco = iroh::address_lookup::memory::MemoryLookup::new();
let (alfie_addr, alfie, _, _g1) = spawn_node(false, disco.clone()).await;
let (betty_addr, betty, _, _g2) = spawn_node(false, disco.clone()).await;
disco.add_endpoint_info(alfie_addr.clone());
disco.add_endpoint_info(betty_addr.clone());
info!("alfie is {}", alfie_addr.id.fmt_short());
info!("betty is {}", betty_addr.id.fmt_short());
let alfie_user = alfie.create_user().await?;
let betty_user = betty.create_user().await?;
let alfie_space = alfie.create(NamespaceKind::Owned, alfie_user).await?;
let space_ticket = alfie_space
.share(
betty_user,
AccessMode::Write,
RestrictArea::Restrict(Area::new_subspace(betty_user)),
)
.await?;
let (betty_space, syncs) = betty
.import_and_sync(space_ticket, SessionMode::ReconcileOnce)
.await?;
let completion = tokio::time::timeout(TIMEOUT, syncs.complete_all()).await?;
println!("Completed syncs: {completion:#?}");
let stream = betty_space.get_many(Range3d::new_full()).await?;
let entries: Vec<_> = stream.try_collect().await?;
println!("{entries:#?}");
Ok(())
}