Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions iroh-relay/src/relay_map.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,45 @@
//! based on tailscale/tailcfg/derpmap.go

use std::{collections::BTreeMap, fmt, sync::Arc};
use std::{
collections::BTreeMap,
fmt,
sync::{Arc, RwLock},
};

use iroh_base::RelayUrl;
use serde::{Deserialize, Serialize};

use crate::defaults::DEFAULT_RELAY_QUIC_PORT;

/// Configuration of all the relay servers that can be used.
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone)]
pub struct RelayMap {
/// A map of the different relay IDs to the [`RelayNode`] information
nodes: Arc<BTreeMap<RelayUrl, Arc<RelayNode>>>,
nodes: Arc<RwLock<BTreeMap<RelayUrl, Arc<RelayNode>>>>,
}

impl PartialEq for RelayMap {
fn eq(&self, other: &Self) -> bool {
let this = self.nodes.read().expect("poisoned");
let that = other.nodes.read().expect("poisoned");
this.eq(&*that)
}
}

impl Eq for RelayMap {}

impl RelayMap {
/// Returns the sorted relay URLs.
pub fn urls(&self) -> impl Iterator<Item = &RelayUrl> {
self.nodes.keys()
pub fn urls<T>(&self) -> T
where
T: FromIterator<RelayUrl>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's... fancy

And maybe a little unfortunate as we're now forcing an allocation. But I guess this is needed because the the inner lock.

{
self.nodes
.read()
.expect("poisoned")
.keys()
.cloned()
.collect::<T>()
}

/// Create an empty relay map.
Expand All @@ -28,39 +50,57 @@ impl RelayMap {
}

/// Returns an `Iterator` over all known nodes.
pub fn nodes(&self) -> impl Iterator<Item = &Arc<RelayNode>> {
self.nodes.values()
pub fn nodes<T>(&self) -> T
where
T: FromIterator<Arc<RelayNode>>,
{
self.nodes
.read()
.expect("poisoned")
.values()
.cloned()
.collect::<T>()
}

/// Is this a known node?
pub fn contains_node(&self, url: &RelayUrl) -> bool {
self.nodes.contains_key(url)
self.nodes.read().expect("poisoned").contains_key(url)
}

/// Get the given node.
pub fn get_node(&self, url: &RelayUrl) -> Option<&Arc<RelayNode>> {
self.nodes.get(url)
pub fn get_node(&self, url: &RelayUrl) -> Option<Arc<RelayNode>> {
self.nodes.read().expect("poisoned").get(url).cloned()
}

/// How many nodes are known?
pub fn len(&self) -> usize {
self.nodes.len()
self.nodes.read().expect("poisoned").len()
}

/// Are there any nodes in this map?
pub fn is_empty(&self) -> bool {
self.nodes.is_empty()
self.nodes.read().expect("poisoned").is_empty()
}

/// Insert a new relay.
pub fn insert(&self, url: RelayUrl, node: Arc<RelayNode>) -> Option<Arc<RelayNode>> {
self.nodes.write().expect("poisoned").insert(url, node)
}

/// Removes an existing relay by `RelayUrl`.
pub fn remove(&self, url: &RelayUrl) -> Option<Arc<RelayNode>> {
self.nodes.write().expect("poisoned").remove(url)
}
}

impl FromIterator<RelayNode> for RelayMap {
fn from_iter<T: IntoIterator<Item = RelayNode>>(iter: T) -> Self {
Self {
nodes: Arc::new(
nodes: Arc::new(RwLock::new(
iter.into_iter()
.map(|node| (node.url.clone(), Arc::new(node)))
.collect(),
),
)),
}
}
}
Expand All @@ -72,15 +112,17 @@ impl From<RelayUrl> for RelayMap {
/// discovery ports.
fn from(value: RelayUrl) -> Self {
Self {
nodes: Arc::new([(value.clone(), Arc::new(value.into()))].into()),
nodes: Arc::new(RwLock::new(
[(value.clone(), Arc::new(value.into()))].into(),
)),
}
}
}

impl From<RelayNode> for RelayMap {
fn from(value: RelayNode) -> Self {
Self {
nodes: Arc::new([(value.url.clone(), Arc::new(value))].into()),
nodes: Arc::new(RwLock::new([(value.url.clone(), Arc::new(value))].into())),
}
}
}
Expand All @@ -92,11 +134,11 @@ impl FromIterator<RelayUrl> for RelayMap {
/// discovery ports.
fn from_iter<T: IntoIterator<Item = RelayUrl>>(iter: T) -> Self {
Self {
nodes: Arc::new(
nodes: Arc::new(RwLock::new(
iter.into_iter()
.map(|url| (url.clone(), Arc::new(url.into())))
.collect(),
),
)),
}
}
}
Expand Down
131 changes: 129 additions & 2 deletions iroh/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::{
};

use ed25519_dalek::{VerifyingKey, pkcs8::DecodePublicKey};
use iroh_base::{NodeAddr, NodeId, SecretKey};
use iroh_relay::RelayMap;
use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey};
use iroh_relay::{RelayMap, RelayNode};
use n0_future::time::Duration;
use n0_watcher::Watcher;
use nested_enum_utils::common_fields;
Expand Down Expand Up @@ -632,6 +632,24 @@ impl Endpoint {
self.msock.endpoint().set_server_config(Some(server_config));
}

/// Adds the provided configuration to the [`RelayMap`].
///
/// Replacing and returning any existing configuration for [`RelayUrl`].
pub async fn insert_relay(
&self,
relay: RelayUrl,
node: Arc<RelayNode>,
) -> Option<Arc<RelayNode>> {
self.msock.insert_relay(relay, node).await
}

/// Removes the configuration from the [`RelayMap`] for the provided [`RelayUrl`].
///
/// Returns any existing configuration.
pub async fn remove_relay(&self, relay: &RelayUrl) -> Option<Arc<RelayNode>> {
self.msock.remove_relay(relay).await
}

// # Methods for establishing connectivity.

/// Connects to a remote [`Endpoint`].
Expand Down Expand Up @@ -2430,6 +2448,115 @@ mod tests {
Ok(())
}

#[tokio::test]
#[traced_test]
async fn endpoint_relay_map_change() -> Result {
let (relay_map, relay_url, _guard1) = run_relay_server().await?;
let client = Endpoint::builder()
.insecure_skip_relay_cert_verify(true)
.relay_mode(RelayMode::Custom(relay_map.clone()))
.bind()
.await?;
let server = Endpoint::builder()
.insecure_skip_relay_cert_verify(true)
.relay_mode(RelayMode::Custom(relay_map))
.alpns(vec![TEST_ALPN.to_vec()])
.bind()
.await?;

let task = tokio::spawn({
let server = server.clone();
async move {
for i in 0..2 {
println!("accept: round {i}");
let Some(conn) = server.accept().await else {
snafu::whatever!("Expected an incoming connection");
};
let conn = conn.await.e()?;
let (mut send, mut recv) = conn.accept_bi().await.e()?;
let data = recv.read_to_end(1000).await.e()?;
send.write_all(&data).await.e()?;
send.finish().e()?;
conn.closed().await;
}
Ok::<_, Error>(())
}
});

server.online().await;

let mut addr = server.node_addr();
println!("round1: {:?}", addr);

// remove direct addrs to force relay usage
addr.direct_addresses.clear();

let conn = client.connect(addr, TEST_ALPN).await?;
let (mut send, mut recv) = conn.open_bi().await.e()?;
send.write_all(b"Hello, world!").await.e()?;
send.finish().e()?;
let data = recv.read_to_end(1000).await.e()?;
conn.close(0u32.into(), b"bye!");

assert_eq!(&data, b"Hello, world!");

// setup a second relay server
let (new_relay_map, new_relay_url, _guard2) = run_relay_server().await?;
let new_node = new_relay_map
.get_node(&new_relay_url)
.expect("missing node")
.clone();
dbg!(&new_relay_map);

let addr_watcher = server.watch_node_addr();

// add new new relay
assert!(
server
.insert_relay(new_relay_url.clone(), new_node.clone())
.await
.is_none()
);
// remove the old relay
assert!(server.remove_relay(&relay_url).await.is_some());

println!("------- changed ----- ");

let mut addr = tokio::time::timeout(Duration::from_secs(10), async move {
let mut stream = addr_watcher.stream();
while let Some(addr) = stream.next().await {
if addr.relay_url.as_ref() != Some(&relay_url) {
return addr;
}
}
panic!("failed to change relay");
})
.await
.e()?;

println!("round2: {:?}", addr);
assert_eq!(addr.relay_url, Some(new_relay_url));

// remove direct addrs to force relay usage
addr.direct_addresses.clear();

let conn = client.connect(addr, TEST_ALPN).await?;
let (mut send, mut recv) = conn.open_bi().await.e()?;
send.write_all(b"Hello, world!").await.e()?;
send.finish().e()?;
let data = recv.read_to_end(1000).await.e()?;
conn.close(0u32.into(), b"bye!");

task.await.e()??;

client.close().await;
server.close().await;

assert_eq!(&data, b"Hello, world!");

Ok(())
}

#[tokio::test]
#[traced_test]
async fn endpoint_bidi_send_recv() -> Result {
Expand Down
Loading
Loading