use std::{
collections::HashMap,
fmt,
net::{Ipv4Addr, SocketAddrV4},
str::FromStr,
};
use bytes::Bytes;
use clap::Parser;
use futures_lite::StreamExt;
use iroh::{
address_lookup::memory::MemoryLookup, Endpoint, EndpointAddr, PublicKey, RelayMode, RelayUrl,
SecretKey,
};
use iroh_gossip::{
api::{Event, GossipReceiver},
net::{Gossip, GOSSIP_ALPN},
proto::TopicId,
};
use n0_error::{bail_any, AnyError, Result, StdResultExt};
use n0_future::task;
use serde::{Deserialize, Serialize};
use serde_byte_array::ByteArray;
#[derive(Parser, Debug)]
struct Args {
#[clap(long)]
secret_key: Option<String>,
#[clap(short, long)]
relay: Option<RelayUrl>,
#[clap(long)]
no_relay: bool,
#[clap(short, long)]
name: Option<String>,
#[clap(short, long, default_value = "0")]
bind_port: u16,
#[clap(subcommand)]
command: Command,
}
#[derive(Parser, Debug)]
enum Command {
Open {
topic: Option<TopicId>,
},
Join {
ticket: String,
},
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let (topic, peers) = match &args.command {
Command::Open { topic } => {
let topic = topic.unwrap_or_else(|| TopicId::from_bytes(rand::random()));
println!("> opening chat room for topic {topic}");
(topic, vec![])
}
Command::Join { ticket } => {
let Ticket { topic, peers } = Ticket::from_str(ticket)?;
println!("> joining chat room for topic {topic}");
(topic, peers)
}
};
let secret_key = match args.secret_key {
None => SecretKey::generate(&mut rand::rng()),
Some(key) => key.parse()?,
};
println!(
"> our secret key: {}",
data_encoding::HEXLOWER.encode(&secret_key.to_bytes())
);
let relay_mode = match (args.no_relay, args.relay) {
(false, None) => RelayMode::Default,
(false, Some(url)) => RelayMode::Custom(url.into()),
(true, None) => RelayMode::Disabled,
(true, Some(_)) => bail_any!("You cannot set --no-relay and --relay at the same time"),
};
println!("> using relay servers: {}", fmt_relay_mode(&relay_mode));
let memory_lookup = MemoryLookup::new();
let endpoint = Endpoint::builder()
.secret_key(secret_key)
.address_lookup(memory_lookup.clone())
.relay_mode(relay_mode.clone())
.bind_addr(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, args.bind_port))?
.bind()
.await?;
println!("> our endpoint id: {}", endpoint.id());
let gossip = Gossip::builder().spawn(endpoint.clone());
if !matches!(relay_mode, RelayMode::Disabled) {
endpoint.online().await;
}
let ticket = {
let me = endpoint.addr();
let peers = peers.iter().cloned().chain([me]).collect();
Ticket { topic, peers }
};
println!("> ticket to join us: {ticket}");
let router = iroh::protocol::Router::builder(endpoint.clone())
.accept(GOSSIP_ALPN, gossip.clone())
.spawn();
let peer_ids = peers.iter().map(|p| p.id).collect();
if peers.is_empty() {
println!("> waiting for peers to join us...");
} else {
println!("> trying to connect to {} peers...", peers.len());
for peer in peers.into_iter() {
memory_lookup.add_endpoint_info(peer);
}
};
let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split();
println!("> connected!");
if let Some(name) = args.name {
let message = Message::AboutMe { name };
let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?;
sender.broadcast(encoded_message).await?;
}
task::spawn(subscribe_loop(receiver));
let (line_tx, mut line_rx) = tokio::sync::mpsc::channel(1);
std::thread::spawn(move || input_loop(line_tx));
println!("> type a message and hit enter to broadcast...");
while let Some(text) = line_rx.recv().await {
let message = Message::Message { text: text.clone() };
let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?;
sender.broadcast(encoded_message).await?;
println!("> sent: {text}");
}
router.shutdown().await.anyerr()?;
Ok(())
}
async fn subscribe_loop(mut receiver: GossipReceiver) -> Result<()> {
let mut names = HashMap::new();
while let Some(event) = receiver.try_next().await? {
if let Event::Received(msg) = event {
let (from, message) = SignedMessage::verify_and_decode(&msg.content)?;
match message {
Message::AboutMe { name } => {
names.insert(from, name.clone());
println!("> {} is now known as {}", from.fmt_short(), name);
}
Message::Message { text } => {
let name = names
.get(&from)
.map_or_else(|| from.fmt_short().to_string(), String::to_string);
println!("{name}: {text}");
}
}
}
}
Ok(())
}
fn input_loop(line_tx: tokio::sync::mpsc::Sender<String>) -> Result<()> {
let mut buffer = String::new();
let stdin = std::io::stdin(); loop {
stdin.read_line(&mut buffer).anyerr()?;
line_tx.blocking_send(buffer.clone()).anyerr()?;
buffer.clear();
}
}
const SIGNATURE_LENGTH: usize = iroh::Signature::LENGTH;
type Signature = ByteArray<SIGNATURE_LENGTH>;
#[derive(Debug, Serialize, Deserialize)]
struct SignedMessage {
from: PublicKey,
data: Bytes,
signature: Signature,
}
impl SignedMessage {
pub fn verify_and_decode(bytes: &[u8]) -> Result<(PublicKey, Message)> {
let signed_message: Self =
postcard::from_bytes(bytes).std_context("decode signed message")?;
let key: PublicKey = signed_message.from;
key.verify(
&signed_message.data,
&iroh::Signature::from_bytes(&signed_message.signature),
)
.std_context("verify signature")?;
let message: Message =
postcard::from_bytes(&signed_message.data).std_context("decode message")?;
Ok((signed_message.from, message))
}
pub fn sign_and_encode(secret_key: &SecretKey, message: &Message) -> Result<Bytes> {
let data: Bytes = postcard::to_stdvec(&message)
.std_context("encode message")?
.into();
let signature = secret_key.sign(&data);
let from: PublicKey = secret_key.public();
let signed_message = Self {
from,
data,
signature: ByteArray::new(signature.to_bytes()),
};
let encoded = postcard::to_stdvec(&signed_message).std_context("encode signed message")?;
Ok(encoded.into())
}
}
#[derive(Debug, Serialize, Deserialize)]
enum Message {
AboutMe { name: String },
Message { text: String },
}
#[derive(Debug, Serialize, Deserialize)]
struct Ticket {
topic: TopicId,
peers: Vec<EndpointAddr>,
}
impl Ticket {
fn from_bytes(bytes: &[u8]) -> Result<Self> {
postcard::from_bytes(bytes).std_context("decode ticket")
}
pub fn to_bytes(&self) -> Vec<u8> {
postcard::to_stdvec(self).expect("postcard::to_stdvec is infallible")
}
}
impl fmt::Display for Ticket {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut text = data_encoding::BASE32_NOPAD.encode(&self.to_bytes()[..]);
text.make_ascii_lowercase();
write!(f, "{text}")
}
}
impl FromStr for Ticket {
type Err = AnyError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let bytes = data_encoding::BASE32_NOPAD
.decode(s.to_ascii_uppercase().as_bytes())
.std_context("decode ticket base32")?;
Self::from_bytes(&bytes)
}
}
fn fmt_relay_mode(relay_mode: &RelayMode) -> String {
match relay_mode {
RelayMode::Disabled => "None".to_string(),
RelayMode::Default => "Default Relay (production) servers".to_string(),
RelayMode::Staging => "Default Relay (staging) servers".to_string(),
RelayMode::Custom(map) => map
.urls::<Vec<_>>()
.into_iter()
.map(|url| url.to_string())
.collect::<Vec<_>>()
.join(", "),
}
}