diff --git a/Cargo.lock b/Cargo.lock index a2c506e..fe3a5bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2732,7 +2732,6 @@ dependencies = [ "futures-sink", "futures-util", "hyper 0.14.32", - "iroh", "iroh-quinn", "nested_enum_utils", "pin-project", @@ -2768,6 +2767,34 @@ dependencies = [ "trybuild", ] +[[package]] +name = "quic-rpc-transport-iroh" +version = "0.31.0" +dependencies = [ + "anyhow", + "async-stream", + "derive_more", + "flume", + "futures-buffered", + "futures-lite", + "futures-sink", + "futures-util", + "iroh", + "iroh-quinn", + "pin-project", + "postcard", + "quic-rpc", + "rand", + "serde", + "testresult", + "thousands", + "tokio", + "tokio-serde", + "tokio-util", + "tracing", + "tracing-subscriber", +] + [[package]] name = "quick-error" version = "1.2.3" diff --git a/Cargo.toml b/Cargo.toml index fc932a4..4c2e87a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,6 @@ futures-lite = "2.3.0" futures-sink = "0.3.30" futures-util = { version = "0.3.30", features = ["sink"] } hyper = { version = "0.14.16", features = ["full"], optional = true } -iroh = { version = "0.31", optional = true } pin-project = "1" quinn = { package = "iroh-quinn", version = "0.12", optional = true } serde = { version = "1", features = ["derive"] } @@ -46,7 +45,6 @@ anyhow = "1" async-stream = "0.3.3" derive_more = { version = "1", features = ["from", "try_into", "display"] } rand = "0.8" - serde = { version = "1", features = ["derive"] } tokio = { version = "1", features = ["full"] } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } @@ -67,8 +65,6 @@ hyper-transport = ["dep:flume", "dep:hyper", "dep:postcard", "dep:bytes", "dep:t quinn-transport = ["dep:flume", "dep:quinn", "dep:postcard", "dep:bytes", "dep:tokio-serde", "tokio-util/codec"] ## In memory transport using the `flume` crate flume-transport = ["dep:flume"] -## p2p QUIC transport using the `iroh` crate -iroh-transport = ["dep:iroh", "dep:flume", "dep:postcard", "dep:tokio-serde", "tokio-util/codec"] ## Macros for creating request handlers macros = [] ## Utilities for testing @@ -100,4 +96,10 @@ name = "modularize" required-features = ["flume-transport"] [workspace] -members = ["examples/split/types", "examples/split/server", "examples/split/client", "quic-rpc-derive"] +members = [ + "examples/split/types", + "examples/split/server", + "examples/split/client", + "quic-rpc-derive", + "quic-rpc-transport-iroh" +] diff --git a/quic-rpc-transport-iroh/Cargo.toml b/quic-rpc-transport-iroh/Cargo.toml new file mode 100644 index 0000000..3235bb4 --- /dev/null +++ b/quic-rpc-transport-iroh/Cargo.toml @@ -0,0 +1,38 @@ +[package] +name = "quic-rpc-transport-iroh" +authors = ["RĂ¼diger Klaehn ", "n0 team"] +version = "0.31.0" +edition = "2021" +keywords = ["api", "protocol", "network", "rpc"] +categories = ["network-programming"] +license = "Apache-2.0/MIT" +repository = "https://github.com/n0-computer/quic-rpc" +description = "An iroh transport for quic-rpc" + + +[dependencies] +iroh = "0.31" +quic-rpc = { version = "0.18.0", path = "../", default-features = false, features = ["quinn-transport"] } +flume = "0.11" +postcard = { version = "1", features = ["use-std"] } +tokio-serde = { version = "0.9", features = [] } +tokio-util = { version = "0.7", features = ["codec"] } +quinn = { package = "iroh-quinn", version = "0.12" } +futures-lite = "2.3.0" +futures-sink = "0.3.30" +futures-util = { version = "0.3.30", features = ["sink"] } +tokio = { version = "1", default-features = false, features = ["macros", "sync"] } +serde = { version = "1", features = ["derive"] } +tracing = "0.1" +anyhow = "1" +pin-project = "1" + +[dev-dependencies] +testresult = "0.4.1" +tokio = { version = "1", features = ["full"] } +rand = "0.8" +async-stream = "0.3.3" +derive_more = { version = "1", features = ["from", "try_into", "display"] } +futures-buffered = "0.2.4" +thousands = "0.2.0" +tracing-subscriber = "0.3.16" diff --git a/quic-rpc-transport-iroh/README.md b/quic-rpc-transport-iroh/README.md new file mode 100644 index 0000000..77c245c --- /dev/null +++ b/quic-rpc-transport-iroh/README.md @@ -0,0 +1,6 @@ +# Quic-Rpc Transport [iroh] + +> A transport allowing for quic rpc to be run over any [iroh] connection + + +[iroh]: https://github.com/n0-computer/iroh diff --git a/quic-rpc-transport-iroh/src/lib.rs b/quic-rpc-transport-iroh/src/lib.rs new file mode 100644 index 0000000..0ebf5f2 --- /dev/null +++ b/quic-rpc-transport-iroh/src/lib.rs @@ -0,0 +1,60 @@ +//! `iroh` transport implementation based on [iroh](https://crates.io/crates/iroh) + +pub mod transport; + +use futures_util::sink::SinkExt; +use futures_util::TryStreamExt; +use quic_rpc::transport::boxed::{ + AcceptFuture, BoxableConnector, BoxableListener, OpenFuture, RecvStream, SendSink, +}; +use quic_rpc::transport::{Connector, Listener, LocalAddr}; +use quic_rpc::{RpcMessage, Service}; + +/// An iroh listener for the given [`Service`] +pub type IrohListener = crate::transport::IrohListener<::Req, ::Res>; + +/// An iroh connector for the given [`Service`] +pub type IrohConnector = + crate::transport::IrohConnector<::Res, ::Req>; + +impl BoxableConnector + for crate::transport::IrohConnector +{ + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } + + fn open_boxed(&self) -> OpenFuture { + let f = Box::pin(async move { + let (send, recv) = Connector::open(self).await?; + // map the error types to anyhow + let send = send.sink_map_err(anyhow::Error::from); + let recv = recv.map_err(anyhow::Error::from); + // return the boxed streams + anyhow::Ok((SendSink::boxed(send), RecvStream::boxed(recv))) + }); + OpenFuture::boxed(f) + } +} + +impl BoxableListener + for crate::transport::IrohListener +{ + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } + + fn accept_bi_boxed(&self) -> AcceptFuture { + let f = async move { + let (send, recv) = Listener::accept(self).await?; + let send = send.sink_map_err(anyhow::Error::from); + let recv = recv.map_err(anyhow::Error::from); + anyhow::Ok((SendSink::boxed(send), RecvStream::boxed(recv))) + }; + AcceptFuture::boxed(f) + } + + fn local_addr(&self) -> &[LocalAddr] { + Listener::local_addr(self) + } +} diff --git a/src/transport/iroh.rs b/quic-rpc-transport-iroh/src/transport.rs similarity index 99% rename from src/transport/iroh.rs rename to quic-rpc-transport-iroh/src/transport.rs index 3ae8dc7..f2b73d5 100644 --- a/src/transport/iroh.rs +++ b/quic-rpc-transport-iroh/src/transport.rs @@ -1,5 +1,3 @@ -//! iroh transport implementation based on [iroh](https://crates.io/crates/iroh) - use std::{ collections::BTreeSet, fmt, @@ -23,12 +21,11 @@ use serde::{de::DeserializeOwned, Serialize}; use tokio::{sync::oneshot, task::yield_now}; use tracing::{debug_span, Instrument}; -use super::{ - util::{FramedPostcardRead, FramedPostcardWrite}, - StreamTypes, -}; -use crate::{ - transport::{ConnectionErrors, Connector, Listener, LocalAddr}, +use quic_rpc::{ + transport::{ + ConnectionErrors, Connector, FramedPostcardRead, FramedPostcardWrite, Listener, LocalAddr, + StreamTypes, + }, RpcMessage, }; diff --git a/tests/iroh.rs b/quic-rpc-transport-iroh/tests/iroh.rs similarity index 88% rename from tests/iroh.rs rename to quic-rpc-transport-iroh/tests/iroh.rs index 514c0b2..dbaac79 100644 --- a/tests/iroh.rs +++ b/quic-rpc-transport-iroh/tests/iroh.rs @@ -1,15 +1,12 @@ -#![cfg(feature = "iroh-transport")] - use iroh::{NodeAddr, SecretKey}; -use quic_rpc::{transport, RpcClient, RpcServer}; +use quic_rpc::{RpcClient, RpcServer}; use testresult::TestResult; -use crate::transport::iroh::{IrohConnector, IrohListener}; +use quic_rpc_transport_iroh::transport::{IrohConnector, IrohListener}; mod math; use math::*; use tokio_util::task::AbortOnDropHandle; -mod util; const ALPN: &[u8] = b"quic-rpc/iroh/test"; @@ -99,15 +96,14 @@ async fn server_away_and_back() -> TestResult<()> { let server_node_id = server_secret_key.public(); // create the RPC client - let client_connection = transport::iroh::IrohConnector::::new( + let client_connection = IrohConnector::::new( client_endpoint.clone(), server_node_id, ALPN.into(), ); - let client = RpcClient::< - ComputeService, - transport::iroh::IrohConnector, - >::new(client_connection); + let client = RpcClient::>::new( + client_connection, + ); // send a request. No server available so it should fail client.rpc(Sqr(4)).await.unwrap_err(); @@ -115,7 +111,7 @@ async fn server_away_and_back() -> TestResult<()> { let server_endpoint = make_endpoint(server_secret_key.clone(), ALPN).await?; // create the RPC Server - let connection = transport::iroh::IrohListener::new(server_endpoint.clone())?; + let connection = IrohListener::new(server_endpoint.clone())?; let server = RpcServer::new(connection); let server_handle = tokio::spawn(ComputeService::server_bounded(server, 1)); @@ -138,7 +134,7 @@ async fn server_away_and_back() -> TestResult<()> { let server_endpoint = make_endpoint(server_secret_key.clone(), ALPN).await?; // make the server run again - let connection = transport::iroh::IrohListener::new(server_endpoint.clone())?; + let connection = IrohListener::new(server_endpoint.clone())?; let server = RpcServer::new(connection); let server_handle = tokio::spawn(ComputeService::server_bounded(server, 5)); diff --git a/quic-rpc-transport-iroh/tests/math.rs b/quic-rpc-transport-iroh/tests/math.rs new file mode 100644 index 0000000..615534b --- /dev/null +++ b/quic-rpc-transport-iroh/tests/math.rs @@ -0,0 +1,343 @@ +use std::{ + io::{self, Write}, + result, +}; + +use async_stream::stream; +use derive_more::{From, TryInto}; +use futures_buffered::BufferedStreamExt; +use futures_lite::{Stream, StreamExt}; +use futures_util::SinkExt; +use quic_rpc::{ + message::{ + BidiStreaming, BidiStreamingMsg, ClientStreaming, ClientStreamingMsg, Msg, RpcMsg, + ServerStreaming, ServerStreamingMsg, + }, + server::{RpcChannel, RpcServerError}, + transport::StreamTypes, + Connector, Listener, RpcClient, RpcServer, Service, +}; +use serde::{Deserialize, Serialize}; +use thousands::Separable; +use tokio_util::task::AbortOnDropHandle; + +/// compute the square of a number +#[derive(Debug, Serialize, Deserialize)] +pub struct Sqr(pub u64); + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SqrResponse(pub u128); + +/// sum a stream of numbers +#[derive(Debug, Serialize, Deserialize)] +pub struct Sum; + +#[derive(Debug, Serialize, Deserialize)] +pub struct SumUpdate(pub u64); + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct SumResponse(pub u128); + +/// compute the fibonacci sequence as a stream +#[derive(Debug, Serialize, Deserialize)] +pub struct Fibonacci(pub u64); + +#[derive(Debug, Serialize, Deserialize)] +pub struct FibonacciResponse(pub u128); + +/// multiply a stream of numbers, returning a stream +#[derive(Debug, Serialize, Deserialize)] +pub struct Multiply(pub u64); + +#[derive(Debug, Serialize, Deserialize)] +pub struct MultiplyUpdate(pub u64); + +#[derive(Debug, Serialize, Deserialize)] +pub struct MultiplyResponse(pub u128); + +/// request enum +#[derive(Debug, Serialize, Deserialize, From, TryInto)] +pub enum ComputeRequest { + Sqr(Sqr), + Sum(Sum), + SumUpdate(SumUpdate), + Fibonacci(Fibonacci), + Multiply(Multiply), + MultiplyUpdate(MultiplyUpdate), +} + +/// response enum +#[allow(clippy::enum_variant_names)] +#[derive(Debug, Serialize, Deserialize, From, TryInto)] +pub enum ComputeResponse { + SqrResponse(SqrResponse), + SumResponse(SumResponse), + FibonacciResponse(FibonacciResponse), + MultiplyResponse(MultiplyResponse), +} + +#[derive(Debug, Clone)] +pub struct ComputeService; + +impl Service for ComputeService { + type Req = ComputeRequest; + type Res = ComputeResponse; +} + +impl RpcMsg for Sqr { + type Response = SqrResponse; +} + +impl Msg for Sum { + type Pattern = ClientStreaming; +} + +impl ClientStreamingMsg for Sum { + type Update = SumUpdate; + type Response = SumResponse; +} + +impl Msg for Fibonacci { + type Pattern = ServerStreaming; +} + +impl ServerStreamingMsg for Fibonacci { + type Response = FibonacciResponse; +} + +impl Msg for Multiply { + type Pattern = BidiStreaming; +} + +impl BidiStreamingMsg for Multiply { + type Update = MultiplyUpdate; + type Response = MultiplyResponse; +} + +impl ComputeService { + async fn sqr(self, req: Sqr) -> SqrResponse { + SqrResponse(req.0 as u128 * req.0 as u128) + } + + async fn sum(self, _req: Sum, updates: impl Stream) -> SumResponse { + let mut sum = 0u128; + tokio::pin!(updates); + while let Some(SumUpdate(n)) = updates.next().await { + sum += n as u128; + } + SumResponse(sum) + } + + fn fibonacci(self, req: Fibonacci) -> impl Stream { + let mut a = 0u128; + let mut b = 1u128; + let mut n = req.0; + stream! { + while n > 0 { + yield FibonacciResponse(a); + let c = a + b; + a = b; + b = c; + n -= 1; + } + } + } + + fn multiply( + self, + req: Multiply, + updates: impl Stream, + ) -> impl Stream { + let product = req.0 as u128; + stream! { + tokio::pin!(updates); + while let Some(MultiplyUpdate(n)) = updates.next().await { + yield MultiplyResponse(product * n as u128); + } + } + } + + pub fn server>( + server: RpcServer, + ) -> AbortOnDropHandle<()> { + server.spawn_accept_loop(|req, chan| Self::handle_rpc_request(ComputeService, req, chan)) + } + + pub async fn handle_rpc_request( + self, + req: ComputeRequest, + chan: RpcChannel, + ) -> Result<(), RpcServerError> + where + E: StreamTypes, + { + use ComputeRequest::*; + #[rustfmt::skip] + match req { + Sqr(msg) => chan.rpc(msg, self, Self::sqr).await, + Sum(msg) => chan.client_streaming(msg, self, Self::sum).await, + Fibonacci(msg) => chan.server_streaming(msg, self, Self::fibonacci).await, + Multiply(msg) => chan.bidi_streaming(msg, self, Self::multiply).await, + MultiplyUpdate(_) => Err(RpcServerError::UnexpectedStartMessage)?, + SumUpdate(_) => Err(RpcServerError::UnexpectedStartMessage)?, + }?; + Ok(()) + } + + /// Runs the service until `count` requests have been received. + pub async fn server_bounded>( + server: RpcServer, + count: usize, + ) -> result::Result, RpcServerError> { + tracing::info!(%count, "server running"); + let s = server; + let mut received = 0; + let service = ComputeService; + while received < count { + received += 1; + let (req, chan) = s.accept().await?.read_first().await?; + let service = service.clone(); + tokio::spawn(async move { + use ComputeRequest::*; + tracing::info!(?req, "got request"); + #[rustfmt::skip] + match req { + Sqr(msg) => chan.rpc(msg, service, ComputeService::sqr).await, + Sum(msg) => chan.client_streaming(msg, service, ComputeService::sum).await, + Fibonacci(msg) => chan.server_streaming(msg, service, ComputeService::fibonacci).await, + Multiply(msg) => chan.bidi_streaming(msg, service, ComputeService::multiply).await, + SumUpdate(_) => Err(RpcServerError::UnexpectedStartMessage)?, + MultiplyUpdate(_) => Err(RpcServerError::UnexpectedStartMessage)?, + }?; + Ok::<_, RpcServerError>(()) + }); + } + tracing::info!(%count, "server finished"); + Ok(s) + } +} + +pub async fn smoke_test>(client: C) -> anyhow::Result<()> { + let client = RpcClient::::new(client); + // a rpc call + tracing::debug!("calling rpc S(1234)"); + let res = client.rpc(Sqr(1234)).await?; + tracing::debug!("got response {:?}", res); + assert_eq!(res, SqrResponse(1522756)); + + // client streaming call + tracing::debug!("calling client_streaming Sum"); + let (mut send, recv) = client.client_streaming(Sum).await?; + tokio::task::spawn(async move { + for i in 1..=3 { + send.send(SumUpdate(i)).await?; + } + Ok::<_, C::SendError>(()) + }); + let res = recv.await?; + tracing::debug!("got response {:?}", res); + assert_eq!(res, SumResponse(6)); + + // server streaming call + tracing::debug!("calling server_streaming Fibonacci(10)"); + let s = client.server_streaming(Fibonacci(10)).await?; + let res: Vec<_> = s.map(|x| x.map(|x| x.0)).try_collect().await?; + tracing::debug!("got response {:?}", res); + assert_eq!(res, vec![0, 1, 1, 2, 3, 5, 8, 13, 21, 34]); + + // bidi streaming call + tracing::debug!("calling bidi Multiply(2)"); + let (mut send, recv) = client.bidi(Multiply(2)).await?; + tokio::task::spawn(async move { + for i in 1..=3 { + send.send(MultiplyUpdate(i)).await?; + } + Ok::<_, C::SendError>(()) + }); + let res: Vec<_> = recv.map(|x| x.map(|x| x.0)).try_collect().await?; + tracing::debug!("got response {:?}", res); + assert_eq!(res, vec![2, 4, 6]); + + tracing::debug!("dropping client!"); + Ok(()) +} + +fn clear_line() { + print!("\r{}\r", " ".repeat(80)); +} + +pub async fn bench(client: RpcClient, n: u64) -> anyhow::Result<()> +where + C::SendError: std::error::Error, + C: Connector, +{ + // individual RPCs + { + let mut sum = 0; + let t0 = std::time::Instant::now(); + for i in 0..n { + sum += client.rpc(Sqr(i)).await?.0; + if i % 10000 == 0 { + print!("."); + io::stdout().flush()?; + } + } + let rps = ((n as f64) / t0.elapsed().as_secs_f64()).round(); + assert_eq!(sum, sum_of_squares(n)); + clear_line(); + println!("RPC seq {} rps", rps.separate_with_underscores(),); + } + // parallel RPCs + { + let t0 = std::time::Instant::now(); + let reqs = futures_lite::stream::iter((0..n).map(Sqr)); + let resp: Vec<_> = reqs + .map(|x| { + let client = client.clone(); + async move { + let res = client.rpc(x).await?.0; + anyhow::Ok(res) + } + }) + .buffered_unordered(32) + .try_collect() + .await?; + let sum = resp.into_iter().sum::(); + let rps = ((n as f64) / t0.elapsed().as_secs_f64()).round(); + assert_eq!(sum, sum_of_squares(n)); + clear_line(); + println!("RPC par {} rps", rps.separate_with_underscores(),); + } + // sequential streaming + { + let t0 = std::time::Instant::now(); + let (send, recv) = client.bidi(Multiply(2)).await?; + let handle = tokio::task::spawn(async move { + let requests = futures_lite::stream::iter((0..n).map(MultiplyUpdate)); + futures_util::StreamExt::forward(requests.map(Ok), send).await?; + anyhow::Result::<()>::Ok(()) + }); + let mut sum = 0; + tokio::pin!(recv); + let mut i = 0; + while let Some(res) = recv.next().await { + sum += res?.0; + if i % 10000 == 0 { + print!("."); + io::stdout().flush()?; + } + i += 1; + } + assert_eq!(sum, (0..n as u128).map(|x| x * 2).sum()); + let rps = ((n as f64) / t0.elapsed().as_secs_f64()).round(); + clear_line(); + println!("bidi seq {} rps", rps.separate_with_underscores(),); + + handle.await??; + } + Ok(()) +} + +fn sum_of_squares(n: u64) -> u128 { + (0..n).map(|x| (x * x) as u128).sum() +} diff --git a/src/client.rs b/src/client.rs index 203eb05..fe8e44d 100644 --- a/src/client.rs +++ b/src/client.rs @@ -39,12 +39,6 @@ pub type QuinnConnector = pub type HyperConnector = crate::transport::hyper::HyperConnector<::Res, ::Req>; -#[cfg(feature = "iroh-transport")] -#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "iroh-transport")))] -/// An iroh connector for the given [`Service`] -pub type IrohConnector = - crate::transport::iroh::IrohConnector<::Res, ::Req>; - /// Sync version of `future::stream::BoxStream`. pub type BoxStreamSync<'a, T> = Pin + Send + Sync + 'a>>; diff --git a/src/server.rs b/src/server.rs index 4cb5cf2..fc20c7d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -64,12 +64,6 @@ pub type QuinnListener = pub type HyperListener = crate::transport::hyper::HyperListener<::Req, ::Res>; -#[cfg(feature = "iroh-transport")] -#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "iroh-transport")))] -/// An iroh listener for the given [`Service`] -pub type IrohListener = - crate::transport::iroh::IrohListener<::Req, ::Res>; - /// A server for a specific service. /// /// This is a wrapper around a [`Listener`] that serves as the entry point for the server DSL. diff --git a/src/transport/boxed.rs b/src/transport/boxed.rs index 8f4a886..8b36a23 100644 --- a/src/transport/boxed.rs +++ b/src/transport/boxed.rs @@ -397,50 +397,6 @@ impl BoxableListener } } -#[cfg(feature = "iroh-transport")] -impl BoxableConnector - for super::iroh::IrohConnector -{ - fn clone_box(&self) -> Box> { - Box::new(self.clone()) - } - - fn open_boxed(&self) -> OpenFuture { - let f = Box::pin(async move { - let (send, recv) = super::Connector::open(self).await?; - // map the error types to anyhow - let send = send.sink_map_err(anyhow::Error::from); - let recv = recv.map_err(anyhow::Error::from); - // return the boxed streams - anyhow::Ok((SendSink::boxed(send), RecvStream::boxed(recv))) - }); - OpenFuture::boxed(f) - } -} - -#[cfg(feature = "iroh-transport")] -impl BoxableListener - for super::iroh::IrohListener -{ - fn clone_box(&self) -> Box> { - Box::new(self.clone()) - } - - fn accept_bi_boxed(&self) -> AcceptFuture { - let f = async move { - let (send, recv) = super::Listener::accept(self).await?; - let send = send.sink_map_err(anyhow::Error::from); - let recv = recv.map_err(anyhow::Error::from); - anyhow::Ok((SendSink::boxed(send), RecvStream::boxed(recv))) - }; - AcceptFuture::boxed(f) - } - - fn local_addr(&self) -> &[super::LocalAddr] { - super::Listener::local_addr(self) - } -} - #[cfg(feature = "flume-transport")] impl BoxableConnector for super::flume::FlumeConnector diff --git a/src/transport/mod.rs b/src/transport/mod.rs index cb7a911..0ae3440 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -37,21 +37,18 @@ pub mod flume; #[cfg(feature = "hyper-transport")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "hyper-transport")))] pub mod hyper; -#[cfg(feature = "iroh-transport")] -#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "iroh-transport")))] -pub mod iroh; pub mod mapped; pub mod misc; #[cfg(feature = "quinn-transport")] #[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "quinn-transport")))] pub mod quinn; -#[cfg(any(feature = "quinn-transport", feature = "iroh-transport"))] -#[cfg_attr( - quicrpc_docsrs, - doc(cfg(any(feature = "quinn-transport", feature = "iroh-transport"))) -)] +#[cfg(feature = "quinn-transport")] +#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "quinn-transport")))] mod util; +#[cfg(feature = "quinn-transport")] +#[cfg_attr(quicrpc_docsrs, doc(cfg(feature = "quinn-transport")))] +pub use self::util::{FramedPostcardRead, FramedPostcardWrite}; /// Errors that can happen when creating and using a [`Connector`] or [`Listener`]. pub trait ConnectionErrors: Debug + Clone + Send + Sync + 'static { diff --git a/src/transport/util.rs b/src/transport/util.rs index 9157cd2..f11c6c7 100644 --- a/src/transport/util.rs +++ b/src/transport/util.rs @@ -10,6 +10,7 @@ use serde::{de::DeserializeOwned, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::codec::LengthDelimitedCodec; +/// Coded `FramedRead` using postcard. #[pin_project] pub struct FramedPostcardRead( #[pin] diff --git a/tests/math.rs b/tests/math.rs index b628c52..06beb2e 100644 --- a/tests/math.rs +++ b/tests/math.rs @@ -2,7 +2,6 @@ feature = "flume-transport", feature = "hyper-transport", feature = "quinn-transport", - feature = "iroh-transport", ))] #![allow(dead_code)] use std::{ diff --git a/tests/slow_math.rs b/tests/slow_math.rs index 2060a9a..53d9518 100644 --- a/tests/slow_math.rs +++ b/tests/slow_math.rs @@ -2,7 +2,6 @@ feature = "flume-transport", feature = "hyper-transport", feature = "quinn-transport", - feature = "iroh-transport", ))] mod math; use std::result;