Skip to content

Commit da1a3b2

Browse files
committed
Add support for event streaming
Signed-off-by: Yuki Kishimoto <yukikishimoto@protonmail.com>
1 parent 2fc54e5 commit da1a3b2

File tree

5 files changed

+105
-0
lines changed

5 files changed

+105
-0
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ uniffi-cli = ["uniffi/cli"] # required for the `uniffi-bindgen` binary
2323
[dependencies]
2424
async-trait = "0.1"
2525
async-wsocket = "0.13"
26+
futures-util = "0.3.31"
2627
nostr = { version = "0.41.0", features = ["std", "all-nips"] }
2728
nostr-connect = "0.41.0"
2829
nostr-sdk = { version = "0.41.0", default-features = false, features = ["all-nips"] }
2930
nwc = "0.41.0"
31+
tokio = { version = "1", features = ["sync"] }
3032
tracing = { version = "0.1", features = ["std"] }
3133
tracing-subscriber = "0.3"
3234
uniffi = { version = "=0.28.3", features = ["tokio"] }

src/client/mod.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use std::ops::Deref;
77
use std::sync::Arc;
88
use std::time::Duration;
99

10+
use nostr::RelayUrl;
1011
use nostr_sdk::client::Client as ClientSdk;
1112
use nostr_sdk::pool::RelayPoolNotification as RelayPoolNotificationSdk;
1213
use nostr_sdk::SubscriptionId;
@@ -32,6 +33,7 @@ use crate::protocol::nips::nip59::UnwrappedGift;
3233
use crate::protocol::signer::NostrSigner;
3334
use crate::relay::options::{SubscribeAutoCloseOptions, SyncOptions};
3435
use crate::relay::{Relay, RelayOptions};
36+
use crate::stream::EventStream;
3537

3638
#[derive(Object)]
3739
pub struct Client {
@@ -407,6 +409,70 @@ impl Client {
407409
.into())
408410
}
409411

412+
/// Stream events from relays
413+
///
414+
/// # Overview
415+
///
416+
/// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
417+
/// For long-lived subscriptions, check [`Client::subscribe`].
418+
///
419+
/// # Gossip
420+
///
421+
/// If `gossip` is enabled the events will be streamed also from
422+
/// NIP65 relays (automatically discovered) of public keys included in filters (if any).
423+
pub async fn stream_events(&self, filter: &Filter, timeout: Duration) -> Result<EventStream> {
424+
let stream = self
425+
.inner
426+
.stream_events(filter.deref().clone(), timeout)
427+
.await?;
428+
Ok(stream.into())
429+
}
430+
431+
/// Stream events from specific relays
432+
///
433+
/// # Overview
434+
///
435+
/// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
436+
/// For long-lived subscriptions, check [`Client::subscribe_to`].
437+
pub async fn stream_events_from(
438+
&self,
439+
urls: Vec<String>,
440+
filter: &Filter,
441+
timeout: Duration,
442+
) -> Result<EventStream> {
443+
let stream = self
444+
.inner
445+
.stream_events_from(urls, filter.deref().clone(), timeout)
446+
.await?;
447+
Ok(stream.into())
448+
}
449+
450+
/// Stream events from specific relays with specific filters
451+
///
452+
/// # Overview
453+
///
454+
/// This is an **auto-closing subscription** and will be closed automatically on `EOSE`.
455+
pub async fn stream_events_targeted(
456+
&self,
457+
targets: HashMap<String, Arc<Filter>>,
458+
timeout: Duration,
459+
) -> Result<EventStream> {
460+
let mut new_targets: HashMap<RelayUrl, nostr::Filter> =
461+
HashMap::with_capacity(targets.len());
462+
463+
for (url, filter) in targets.into_iter() {
464+
let url: RelayUrl = RelayUrl::parse(&url)?;
465+
let filter: nostr::Filter = filter.as_ref().deref().clone();
466+
new_targets.insert(url, filter);
467+
}
468+
469+
let stream = self
470+
.inner
471+
.stream_events_targeted(new_targets, timeout)
472+
.await?;
473+
Ok(stream.into())
474+
}
475+
410476
pub async fn send_msg_to(&self, urls: Vec<String>, msg: &ClientMessage) -> Result<Output> {
411477
Ok(self
412478
.inner

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ pub mod nwc;
1616
pub mod policy;
1717
pub mod protocol;
1818
pub mod relay;
19+
pub mod stream;
1920
pub mod transport;
2021
mod util;
2122

src/stream.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// Copyright (c) 2022-2023 Yuki Kishimoto
2+
// Copyright (c) 2023-2025 Rust Nostr Developers
3+
// Distributed under the MIT software license
4+
5+
use std::sync::Arc;
6+
7+
use futures_util::StreamExt;
8+
use nostr_sdk::pool::stream::ReceiverStream;
9+
use tokio::sync::Mutex;
10+
use uniffi::Object;
11+
12+
use crate::protocol::event::Event;
13+
14+
#[derive(Object)]
15+
pub struct EventStream {
16+
stream: Mutex<ReceiverStream<nostr::Event>>,
17+
}
18+
19+
impl From<ReceiverStream<nostr::Event>> for EventStream {
20+
fn from(stream: ReceiverStream<nostr::Event>) -> Self {
21+
Self {
22+
stream: Mutex::new(stream),
23+
}
24+
}
25+
}
26+
27+
#[uniffi::export(async_runtime = "tokio")]
28+
impl EventStream {
29+
pub async fn next(&self) -> Option<Arc<Event>> {
30+
let mut stream = self.stream.lock().await;
31+
let event: nostr::Event = stream.next().await?;
32+
Some(Arc::new(event.into()))
33+
}
34+
}

0 commit comments

Comments
 (0)