use std::sync::{Arc, RwLock};
use iroh_base::{EndpointAddr, EndpointId};
use n0_error::{AnyError, e, stack_error};
use n0_future::boxed::BoxStream;
pub use crate::endpoint_info::{EndpointData, EndpointInfo, ParseError, UserData};
use crate::{Endpoint, endpoint::EndpointError};
#[cfg(not(wasm_browser))]
pub mod dns;
#[cfg(feature = "address-lookup-mdns")]
pub mod mdns;
pub mod memory;
pub mod pkarr;
#[cfg(not(wasm_browser))]
pub use dns::*;
#[cfg(feature = "address-lookup-mdns")]
pub use mdns::*;
pub use memory::*;
#[cfg(feature = "address-lookup-pkarr-dht")]
pub use pkarr::dht::*;
pub use pkarr::*;
pub trait IntoAddressLookup: Send + Sync + std::fmt::Debug + 'static {
fn into_address_lookup(
self,
endpoint: &Endpoint,
) -> Result<impl AddressLookup, IntoAddressLookupError>;
}
impl<T: AddressLookup> IntoAddressLookup for T {
fn into_address_lookup(
self,
_endpoint: &Endpoint,
) -> Result<impl AddressLookup, IntoAddressLookupError> {
Ok(self)
}
}
pub(crate) trait DynIntoAddressLookup: Send + Sync + std::fmt::Debug + 'static {
fn into_address_lookup(
self: Box<Self>,
endpoint: &Endpoint,
) -> Result<Box<dyn AddressLookup>, IntoAddressLookupError>;
}
impl<T: IntoAddressLookup> DynIntoAddressLookup for T {
fn into_address_lookup(
self: Box<Self>,
endpoint: &Endpoint,
) -> Result<Box<dyn AddressLookup>, IntoAddressLookupError> {
let addr_lookup: Box<dyn AddressLookup> =
Box::new(IntoAddressLookup::into_address_lookup(*self, endpoint)?);
Ok(addr_lookup)
}
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta, from_sources, std_sources)]
#[non_exhaustive]
pub enum IntoAddressLookupError {
#[error("Service '{provenance}' error")]
User {
provenance: &'static str,
source: AnyError,
},
#[error(transparent)]
EndpointClosed { source: EndpointError },
}
impl IntoAddressLookupError {
pub fn from_err<T: std::error::Error + Send + Sync + 'static>(
provenance: &'static str,
source: T,
) -> Self {
e!(IntoAddressLookupError::User {
provenance,
source: AnyError::from_std(source)
})
}
pub fn from_err_box(
provenance: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
) -> Self {
e!(IntoAddressLookupError::User {
provenance,
source: AnyError::from_std_box(source)
})
}
}
#[allow(missing_docs)]
#[stack_error(derive, add_meta)]
#[non_exhaustive]
#[derive(Clone)]
pub enum Error {
#[error("No address lookup configured")]
NoServiceConfigured,
#[error("Address lookup produced no results")]
NoResults,
#[error("Service '{provenance}' error")]
User {
provenance: &'static str,
source: Arc<AnyError>,
},
}
impl Error {
#[track_caller]
pub fn from_err<T: std::error::Error + Send + Sync + 'static>(
provenance: &'static str,
source: T,
) -> Self {
Self::from_err_any(provenance, AnyError::from_std(source))
}
#[track_caller]
pub fn from_err_box(
provenance: &'static str,
source: Box<dyn std::error::Error + Send + Sync + 'static>,
) -> Self {
Self::from_err_any(provenance, AnyError::from_std_box(source))
}
#[track_caller]
pub fn from_err_any(provenance: &'static str, source: impl Into<AnyError>) -> Self {
e!(Error::User {
provenance,
source: Arc::new(source.into())
})
}
}
pub trait AddressLookup: std::fmt::Debug + Send + Sync + 'static {
fn publish(&self, _data: &EndpointData) {}
fn resolve(&self, _endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
None
}
}
impl<T: AddressLookup> AddressLookup for Arc<T> {
fn publish(&self, data: &EndpointData) {
self.as_ref().publish(data);
}
fn resolve(&self, endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
self.as_ref().resolve(endpoint_id)
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Item {
endpoint_info: EndpointInfo,
provenance: &'static str,
last_updated: Option<u64>,
}
impl Item {
pub fn new(
endpoint_info: EndpointInfo,
provenance: &'static str,
last_updated: Option<u64>,
) -> Self {
Self {
endpoint_info,
provenance,
last_updated,
}
}
pub fn endpoint_id(&self) -> EndpointId {
self.endpoint_info.endpoint_id
}
pub fn endpoint_info(&self) -> &EndpointInfo {
&self.endpoint_info
}
pub fn provenance(&self) -> &'static str {
self.provenance
}
pub fn last_updated(&self) -> Option<u64> {
self.last_updated
}
pub fn to_endpoint_addr(&self) -> EndpointAddr {
self.endpoint_info.to_endpoint_addr()
}
pub fn into_endpoint_addr(self) -> EndpointAddr {
self.endpoint_info.into_endpoint_addr()
}
pub fn user_data(&self) -> Option<UserData> {
self.endpoint_info().data.user_data().cloned()
}
}
impl std::ops::Deref for Item {
type Target = EndpointData;
fn deref(&self) -> &Self::Target {
&self.endpoint_info.data
}
}
impl From<Item> for EndpointInfo {
fn from(item: Item) -> Self {
item.endpoint_info
}
}
#[derive(Debug, Default, Clone)]
pub struct ConcurrentAddressLookup {
services: Arc<RwLock<Vec<Box<dyn AddressLookup>>>>,
last_data: Arc<RwLock<Option<EndpointData>>>,
}
impl ConcurrentAddressLookup {
pub fn empty() -> Self {
Self::default()
}
pub fn from_services(services: Vec<Box<dyn AddressLookup>>) -> Self {
Self {
services: Arc::new(RwLock::new(services)),
last_data: Default::default(),
}
}
pub fn add(&self, service: impl AddressLookup + 'static) {
self.add_boxed(Box::new(service))
}
pub fn add_boxed(&self, service: Box<dyn AddressLookup>) {
{
let data = self.last_data.read().expect("poisoned");
if let Some(data) = &*data {
service.publish(data)
}
}
self.services.write().expect("poisoned").push(service);
}
pub fn is_empty(&self) -> bool {
self.services.read().expect("poisoned").is_empty()
}
pub fn len(&self) -> usize {
self.services.read().expect("poisoned").len()
}
pub fn clear(&self) {
let mut services = self.services.write().expect("poisoned");
services.clear();
}
}
impl<T> From<T> for ConcurrentAddressLookup
where
T: IntoIterator<Item = Box<dyn AddressLookup>>,
{
fn from(iter: T) -> Self {
let services = iter.into_iter().collect::<Vec<_>>();
Self {
services: Arc::new(RwLock::new(services)),
last_data: Default::default(),
}
}
}
impl AddressLookup for ConcurrentAddressLookup {
fn publish(&self, data: &EndpointData) {
let services = self.services.read().expect("poisoned");
for service in &*services {
service.publish(data);
}
self.last_data
.write()
.expect("poisoned")
.replace(data.clone());
}
fn resolve(&self, endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
let services = self.services.read().expect("poisoned");
let streams = services
.iter()
.filter_map(|service| service.resolve(endpoint_id));
let streams = n0_future::MergeBounded::from_iter(streams);
Some(Box::pin(streams))
}
}
#[cfg(test)]
mod tests {
use std::{
collections::HashMap,
net::SocketAddr,
sync::{Arc, Mutex},
time::{Duration, SystemTime},
};
use iroh_base::{EndpointAddr, SecretKey, TransportAddr};
use n0_error::{AnyError, Result, StackResultExt};
use n0_future::{StreamExt, time};
use n0_tracing_test::traced_test;
use rand::{CryptoRng, Rng, SeedableRng};
use tokio_util::task::AbortOnDropHandle;
use super::*;
use crate::{
Endpoint, RelayMode,
endpoint::{ConnectOptions, IdleTimeout, QuicTransportConfig},
};
type InfoStore = HashMap<EndpointId, (EndpointData, u64)>;
#[derive(Debug, Clone, Default)]
struct TestAddressLookupShared {
endpoints: Arc<Mutex<InfoStore>>,
}
impl TestAddressLookupShared {
pub fn create_address_lookup(&self, endpoint_id: EndpointId) -> TestAddressLookup {
TestAddressLookup {
endpoint_id,
shared: self.clone(),
publish: true,
resolve_wrong: false,
delay: Duration::from_millis(200),
}
}
pub fn create_lying_address_lookup(&self, endpoint_id: EndpointId) -> TestAddressLookup {
TestAddressLookup {
endpoint_id,
shared: self.clone(),
publish: false,
resolve_wrong: true,
delay: Duration::from_millis(100),
}
}
}
#[derive(Debug)]
struct TestAddressLookup {
endpoint_id: EndpointId,
shared: TestAddressLookupShared,
publish: bool,
resolve_wrong: bool,
delay: Duration,
}
impl AddressLookup for TestAddressLookup {
fn publish(&self, data: &EndpointData) {
if !self.publish {
return;
}
let now = system_time_now();
self.shared
.endpoints
.lock()
.unwrap()
.insert(self.endpoint_id, (data.clone(), now));
}
fn resolve(&self, endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
let addr_info = if self.resolve_wrong {
let ts = system_time_now() - 100_000;
let port: u16 = rand::rng().random_range(10_000..20_000);
let addr: SocketAddr = format!("240.0.0.1:{port}").parse().unwrap();
let data = EndpointData::new([TransportAddr::Ip(addr)]);
Some((data, ts))
} else {
self.shared
.endpoints
.lock()
.unwrap()
.get(&endpoint_id)
.cloned()
};
let stream = match addr_info {
Some((data, ts)) => {
let item = Item::new(
EndpointInfo::from_parts(endpoint_id, data),
"test-addr-lookup",
Some(ts),
);
let delay = self.delay;
let fut = async move {
time::sleep(delay).await;
tracing::debug!("resolve: {} = {item:?}", endpoint_id.fmt_short());
Ok(item)
};
n0_future::stream::once_future(fut).boxed()
}
None => n0_future::stream::empty().boxed(),
};
Some(stream)
}
}
#[derive(Debug, Clone)]
struct EmptyAddressLookup;
impl AddressLookup for EmptyAddressLookup {
fn publish(&self, _data: &EndpointData) {}
fn resolve(&self, _endpoint_id: EndpointId) -> Option<BoxStream<Result<Item, Error>>> {
Some(n0_future::stream::empty().boxed())
}
}
const TEST_ALPN: &[u8] = b"n0/iroh/test";
#[tokio::test]
#[traced_test]
async fn address_lookup_simple_shared() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let eir_shared = TestAddressLookupShared::default();
let (ep1, _guard1) =
new_endpoint(&mut rng, |ep| eir_shared.create_address_lookup(ep.id())).await;
let (ep2, _guard2) =
new_endpoint(&mut rng, |ep| eir_shared.create_address_lookup(ep.id())).await;
let ep1_addr = EndpointAddr::new(ep1.id());
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_simple_shared_with_arc() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
Arc::new(address_lookup_shared.create_address_lookup(ep.id()))
})
.await;
let (ep2, _guard2) = new_endpoint(&mut rng, |ep| {
Arc::new(address_lookup_shared.create_address_lookup(ep.id()))
})
.await;
let ep1_addr = EndpointAddr::new(ep1.id());
let _conn = ep2.connect(ep1_addr, TEST_ALPN).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_combined_with_empty_and_right() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint_add(&mut rng, |ep| {
let addr_lookup1 = EmptyAddressLookup;
let addr_lookup2 = address_lookup_shared.create_address_lookup(ep.id());
ep.address_lookup()
.expect("endpoint is still open")
.add(addr_lookup1);
ep.address_lookup()
.expect("endpoint is still open")
.add(addr_lookup2);
})
.await;
let ep1_addr = EndpointAddr::new(ep1.id());
assert_eq!(
ep2.address_lookup().expect("endpoint is still open").len(),
2
);
let _conn = ep2
.connect(ep1_addr, TEST_ALPN)
.await
.context("connecting")?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_combined_with_empty_and_wrong() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint(&mut rng, |ep| {
let address_lookup1 = EmptyAddressLookup;
let address_lookup2 = address_lookup_shared.create_lying_address_lookup(ep.id());
let address_lookup3 = address_lookup_shared.create_address_lookup(ep.id());
let address_lookup = ConcurrentAddressLookup::empty();
address_lookup.add(address_lookup1);
address_lookup.add(address_lookup2);
address_lookup.add(address_lookup3);
address_lookup
})
.await;
let _conn = ep2.connect(ep1.id(), TEST_ALPN).await?;
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_combined_wrong_only() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint(&mut rng, |ep| {
let address_lookup1 = address_lookup_shared.create_lying_address_lookup(ep.id());
ConcurrentAddressLookup::from_services(vec![Box::new(address_lookup1)])
})
.await;
let cfg = QuicTransportConfig::builder()
.keep_alive_interval(Duration::from_secs(1))
.max_idle_timeout(Some(IdleTimeout::try_from(Duration::from_secs(3)).unwrap()))
.build();
let opts = ConnectOptions::new().with_transport_config(cfg);
let res = ep2
.connect_with_opts(ep1.id(), TEST_ALPN, opts)
.await? .await; assert!(res.is_err());
Ok(())
}
#[tokio::test]
#[traced_test]
async fn address_lookup_with_wrong_existing_addr() -> Result {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let address_lookup_shared = TestAddressLookupShared::default();
let (ep1, _guard1) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let (ep2, _guard2) = new_endpoint(&mut rng, |ep| {
address_lookup_shared.create_address_lookup(ep.id())
})
.await;
let ep1_wrong_addr = EndpointAddr::from_parts(
ep1.id(),
[TransportAddr::Ip("240.0.0.1:1000".parse().unwrap())],
);
let _conn = ep2.connect(ep1_wrong_addr, TEST_ALPN).await?;
Ok(())
}
async fn new_endpoint<R: CryptoRng, D: AddressLookup + 'static, F: FnOnce(&Endpoint) -> D>(
rng: &mut R,
create_address_lookup: F,
) -> (Endpoint, AbortOnDropHandle<Result<()>>) {
new_endpoint_add(rng, |ep| {
let address_lookup = create_address_lookup(ep);
ep.address_lookup()
.expect("endpoint is still open")
.add(address_lookup);
})
.await
}
async fn new_endpoint_add<R: CryptoRng, F: FnOnce(&Endpoint)>(
rng: &mut R,
add_address_lookup: F,
) -> (Endpoint, AbortOnDropHandle<Result<()>>) {
let secret = SecretKey::generate(rng);
let ep = Endpoint::empty_builder(RelayMode::Disabled)
.secret_key(secret)
.alpns(vec![TEST_ALPN.to_vec()])
.bind()
.await
.unwrap();
add_address_lookup(&ep);
let handle = tokio::spawn({
let ep = ep.clone();
async move {
let mut connections = Vec::new();
while let Some(accepting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
let conn = accepting.await.context("accepting")?;
connections.push(conn);
}
Ok::<_, AnyError>(())
}
});
(ep, AbortOnDropHandle::new(handle))
}
fn system_time_now() -> u64 {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time drift")
.as_micros() as u64
}
}
#[cfg(test)]
mod test_dns_pkarr {
use iroh_base::{EndpointAddr, SecretKey, TransportAddr};
use iroh_relay::{RelayMap, endpoint_info::UserData};
use n0_error::{AnyError, Result, StackResultExt};
use n0_future::time::Duration;
use n0_tracing_test::traced_test;
use rand::{CryptoRng, SeedableRng};
use tokio_util::task::AbortOnDropHandle;
use crate::{
Endpoint, RelayMode,
address_lookup::{EndpointData, PkarrPublisher},
dns::DnsResolver,
endpoint_info::EndpointInfo,
test_utils::{
DnsPkarrServer, dns_server::run_dns_server, pkarr_dns_state::State, run_relay_server,
},
};
const PUBLISH_TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test]
#[traced_test]
async fn dns_resolve() -> Result<()> {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let origin = "testdns.example".to_string();
let state = State::new(origin.clone());
let (nameserver, _dns_drop_guard) = run_dns_server(state.clone())
.await
.context("Running DNS server")?;
let secret_key = SecretKey::generate(&mut rng);
let endpoint_info = EndpointInfo::new(secret_key.public())
.with_relay_url(Some("https://relay.example".parse().unwrap()));
let signed_packet = endpoint_info.to_pkarr_signed_packet(&secret_key, 30)?;
state
.upsert(signed_packet)
.context("update and insert signed packet")?;
let resolver = DnsResolver::with_nameserver(nameserver);
let resolved = resolver
.lookup_endpoint_by_id(&endpoint_info.endpoint_id, &origin)
.await?;
assert_eq!(resolved, endpoint_info);
Ok(())
}
#[tokio::test]
#[traced_test]
async fn pkarr_publish_dns_resolve() -> Result<()> {
let origin = "testdns.example".to_string();
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let dns_pkarr_server = DnsPkarrServer::run_with_origin(origin.clone())
.await
.context("DnsPkarrServer")?;
let secret_key = SecretKey::generate(&mut rng);
let endpoint_id = secret_key.public();
let relay_url = Some(TransportAddr::Relay(
"https://relay.example".parse().unwrap(),
));
let resolver = DnsResolver::with_nameserver(dns_pkarr_server.nameserver);
let publisher =
PkarrPublisher::builder(dns_pkarr_server.pkarr_url.clone()).build(secret_key);
let user_data: UserData = "foobar".parse().unwrap();
let data = EndpointData::new(relay_url.clone()).with_user_data(Some(user_data.clone()));
publisher.update_endpoint_data(&data);
dns_pkarr_server
.on_endpoint(&endpoint_id, PUBLISH_TIMEOUT)
.await
.context("wait for on endpoint update")?;
let resolved = resolver
.lookup_endpoint_by_id(&endpoint_id, &origin)
.await?;
println!("resolved {resolved:?}");
let expected_addr = EndpointAddr::from_parts(endpoint_id, relay_url);
assert_eq!(resolved.to_endpoint_addr(), expected_addr);
assert_eq!(resolved.user_data(), Some(&user_data));
Ok(())
}
const TEST_ALPN: &[u8] = b"TEST";
#[tokio::test]
#[traced_test]
async fn pkarr_publish_dns_address_lookup() -> Result<()> {
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(0u64);
let dns_pkarr_server = DnsPkarrServer::run().await.context("DnsPkarrServer run")?;
let (relay_map, _relay_url, _relay_guard) = run_relay_server().await?;
let (ep1, _guard1) =
ep_with_address_lookup(&mut rng, &relay_map, &dns_pkarr_server).await?;
let (ep2, _guard2) =
ep_with_address_lookup(&mut rng, &relay_map, &dns_pkarr_server).await?;
dns_pkarr_server
.on_endpoint(&ep1.id(), PUBLISH_TIMEOUT)
.await
.context("wait for on endpoint update")?;
let _conn = ep2.connect(ep1.id(), TEST_ALPN).await?;
Ok(())
}
async fn ep_with_address_lookup<R: CryptoRng + ?Sized>(
rng: &mut R,
relay_map: &RelayMap,
dns_pkarr_server: &DnsPkarrServer,
) -> Result<(Endpoint, AbortOnDropHandle<Result<()>>)> {
let secret_key = SecretKey::generate(rng);
let ep = Endpoint::empty_builder(RelayMode::Custom(relay_map.clone()))
.insecure_skip_relay_cert_verify(true)
.secret_key(secret_key.clone())
.alpns(vec![TEST_ALPN.to_vec()])
.dns_resolver(dns_pkarr_server.dns_resolver())
.address_lookup(dns_pkarr_server.address_lookup(secret_key))
.bind()
.await?;
let handle = tokio::spawn({
let ep = ep.clone();
async move {
while let Some(accepting) = ep.accept().await.and_then(|inc| inc.accept().ok()) {
let _conn = accepting.await.context("accepting")?;
}
Ok::<_, AnyError>(())
}
});
Ok((ep, AbortOnDropHandle::new(handle)))
}
}