use std::time::Instant;
use anyhow::Result;
use futures_lite::StreamExt;
use iroh_willow::{
interest::Interests,
proto::grouping::Range3d,
session::{intents::Completion, SessionInit, SessionMode},
};
use tracing::info;
use self::util::{create_rng, insert, parse_env_var, setup_and_delegate, spawn_two};
#[tokio::main]
async fn main() -> Result<()> {
let t = Instant::now();
tracing_subscriber::fmt::init();
let n_betty: usize = parse_env_var("N_BETTY", 100);
let n_alfie: usize = parse_env_var("N_ALFIE", 100);
let mut rng = create_rng("peer_manager_two_intents");
let start = Instant::now();
let [alfie, betty] = spawn_two(&mut rng).await?;
let (namespace, alfie_user, betty_user) = setup_and_delegate(&alfie, &betty).await?;
info!(t=?t.elapsed(), d=?start.elapsed(), "setup done");
let start = Instant::now();
for i in 0..n_alfie {
let x = format!("{i}");
insert(
&alfie,
namespace,
alfie_user,
&[b"alfie", x.as_bytes()],
"foo",
)
.await?;
}
for i in 0..n_betty {
let x = format!("{i}");
insert(
&betty,
namespace,
betty_user,
&[b"betty", x.as_bytes()],
"foo",
)
.await?;
}
info!(t=?t.elapsed(), d=?start.elapsed(), "insert done");
let start = Instant::now();
let init = SessionInit::new(Interests::all(), SessionMode::ReconcileOnce);
let mut intent_alfie = alfie
.sync_with_peer(betty.endpoint_id(), init.clone())
.await
.unwrap();
let mut intent_betty = betty
.sync_with_peer(alfie.endpoint_id(), init)
.await
.unwrap();
let completion_alfie = intent_alfie.complete().await?;
let completion_betty = intent_betty.complete().await?;
info!(t=?t.elapsed(), d=?start.elapsed(), "sync done");
let time = start.elapsed();
let total = n_alfie + n_betty;
let per_entry = time.as_micros() / total as u128;
let entries_per_second = (total as f32 / time.as_secs_f32()).round();
info!(time=?time, ms_per_entry=per_entry, entries_per_second, "sync done");
assert_eq!(completion_alfie, Completion::Complete);
assert_eq!(completion_betty, Completion::Complete);
let start = Instant::now();
let alfie_count = alfie
.get_entries(namespace, Range3d::new_full())
.await?
.count()
.await;
let betty_count = betty
.get_entries(namespace, Range3d::new_full())
.await?
.count()
.await;
info!(t=?t.elapsed(), d=?start.elapsed(), "get done");
info!("alfie has now {} entries", alfie_count);
info!("betty has now {} entries", betty_count);
assert_eq!(alfie_count, n_alfie + n_betty);
assert_eq!(betty_count, n_alfie + n_betty);
alfie.shutdown().await?;
betty.shutdown().await?;
Ok(())
}
mod util {
use std::sync::{Arc, Mutex};
use anyhow::Result;
use bytes::Bytes;
use futures_concurrency::future::TryJoin;
use iroh::{Endpoint, EndpointId};
use iroh::address_lookup::memory::MemoryLookup;
use iroh_willow::{
engine::{AcceptOpts, Engine},
form::EntryForm,
interest::{CapSelector, DelegateTo, RestrictArea},
proto::{
data_model::{Path, PathExt},
keys::{NamespaceId, NamespaceKind, UserId},
meadowcap::AccessMode,
},
ALPN,
};
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use rand_core::CryptoRngCore;
use tokio::task::JoinHandle;
pub fn create_rng(seed: &str) -> ChaCha12Rng {
let seed = iroh_blobs::Hash::new(seed);
let seed_bytes: [u8; 32] = seed.as_bytes()[..32].try_into().unwrap();
ChaCha12Rng::from_seed(seed_bytes)
}
fn generate_secret_key(rng: &mut impl CryptoRngCore) -> iroh::SecretKey {
let bytes: [u8; 32] = rand::Rng::gen(rng);
iroh::SecretKey::from_bytes(&bytes)
}
#[derive(Debug, Clone)]
pub struct Peer {
endpoint: Endpoint,
engine: Engine,
accept_task: Arc<Mutex<Option<JoinHandle<Result<()>>>>>,
}
impl Peer {
pub async fn spawn(
secret_key: iroh::SecretKey,
disco: MemoryLookup,
accept_opts: AcceptOpts,
) -> Result<Self> {
let endpoint = Endpoint::empty_builder(iroh::RelayMode::Disabled)
.secret_key(secret_key)
.address_lookup(disco.clone())
.alpns(vec![ALPN.to_vec()])
.bind()
.await?;
disco.add_endpoint_info(endpoint.addr());
let blobs = iroh_blobs::store::mem::MemStore::default();
let create_store = move || iroh_willow::store::memory::Store::new(blobs.into());
let engine = Engine::spawn(endpoint.clone(), create_store, accept_opts);
let accept_task = tokio::task::spawn({
let engine = engine.clone();
let endpoint = endpoint.clone();
async move {
while let Some(incoming) = endpoint.accept().await {
let conn = incoming.await;
let Ok(conn) = conn else {
continue;
};
if conn.alpn() != ALPN {
continue;
}
engine.handle_connection(conn).await?;
}
Result::Ok(())
}
});
Ok(Self {
endpoint,
engine,
accept_task: Arc::new(Mutex::new(Some(accept_task))),
})
}
pub async fn shutdown(self) -> Result<()> {
let accept_task = self.accept_task.lock().unwrap().take();
if let Some(accept_task) = accept_task {
accept_task.abort();
match accept_task.await {
Err(err) if err.is_cancelled() => {}
Ok(Ok(())) => {}
Err(err) => Err(err)?,
Ok(Err(err)) => Err(err)?,
}
}
self.engine.shutdown().await?;
self.endpoint.close().await;
Ok(())
}
pub fn endpoint_id(&self) -> EndpointId {
self.endpoint.id()
}
}
impl std::ops::Deref for Peer {
type Target = Engine;
fn deref(&self) -> &Self::Target {
&self.engine
}
}
pub async fn spawn_two(rng: &mut impl CryptoRngCore) -> Result<[Peer; 2]> {
let disco = MemoryLookup::new();
let peers = [
generate_secret_key(rng),
generate_secret_key(rng),
]
.map(|secret_key| Peer::spawn(secret_key, disco.clone(), Default::default()))
.try_join()
.await?;
Ok(peers)
}
pub async fn setup_and_delegate(
alfie: &Engine,
betty: &Engine,
) -> Result<(NamespaceId, UserId, UserId)> {
let user_alfie = alfie.create_user().await?;
let user_betty = betty.create_user().await?;
let namespace_id = alfie
.create_namespace(NamespaceKind::Owned, user_alfie)
.await?;
let cap_for_betty = alfie
.delegate_caps(
CapSelector::any(namespace_id),
AccessMode::Write,
DelegateTo::new(user_betty, RestrictArea::None),
)
.await?;
betty.import_caps(cap_for_betty).await?;
Ok((namespace_id, user_alfie, user_betty))
}
pub async fn insert(
handle: &Engine,
namespace_id: NamespaceId,
user: UserId,
path: &[&[u8]],
bytes: impl Into<Bytes>,
) -> Result<()> {
let path = Path::from_bytes(path)?;
let entry = EntryForm::new_bytes(namespace_id, path, bytes);
handle.insert_entry(entry, user).await?;
Ok(())
}
pub fn parse_env_var<T>(var: &str, default: T) -> T
where
T: std::str::FromStr,
T::Err: std::fmt::Debug,
{
match std::env::var(var).as_deref() {
Ok(val) => val
.parse()
.unwrap_or_else(|_| panic!("failed to parse environment variable {var}")),
Err(_) => default,
}
}
}