diff --git a/Cargo.toml b/Cargo.toml index 54c9efd..ccc97f7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,9 @@ async-graphql = "^5.0.0" warp = { version = "^0.3.2", optional = true } async-graphql-warp = { version = "^5.0.0", optional = true } http = { version = "^0.2.8", optional = true } +stream-cancel = "0.8.1" +dashmap = "5.4.0" +lazy_static = "1.4.0" [features] graphiql = [ "dep:http", "dep:warp", "dep:async-graphql-warp" ] diff --git a/packages/urql/index.ts b/packages/urql/index.ts index 98e1f74..5e5a30f 100644 --- a/packages/urql/index.ts +++ b/packages/urql/index.ts @@ -178,10 +178,13 @@ export function subscribe( operation: SubscriptionOperation, sink: ObserverLike ) { - let unlisten: () => void = () => {} - const id = Math.floor(Math.random() * 10000000) + let unlisten: () => void = () => { + Promise.resolve().then(() => invoke('plugin:graphql|subscriptions_end', {...operation, id})) + .catch(err => console.error(err)) + } + Promise.resolve() .then(async () => listen(`graphql://${id}`, (event: Event) => { diff --git a/src/lib.rs b/src/lib.rs index 0200993..7afb890 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -222,6 +222,13 @@ use tauri::{ plugin::{self, TauriPlugin}, Invoke, InvokeError, Manager, Runtime, }; +use stream_cancel::{Trigger, Tripwire, StreamExt as CancelStreamExt}; +use dashmap::DashMap; +use lazy_static::lazy_static; + +lazy_static! { +static ref SUBSCRIPTIONS:DashMap = DashMap::new(); +} fn invoke_handler( schema: Schema, @@ -254,11 +261,12 @@ where let req: SubscriptionRequest = serde_json::from_value(invoke.message.payload().clone()) .map_err(InvokeError::from_serde_json)?; + let (trigger, tripwire) = Tripwire::new(); let subscription_window = window.clone(); - let mut stream = schema.execute_stream(req.inner.data(window.app_handle()).data(window)); + let mut stream = schema.execute_stream(req.inner.data(window.app_handle()).data(window)).take_until_if(tripwire); let event_id = &format!("graphql://{}", req.id); - + SUBSCRIPTIONS.insert(req.id, trigger); while let Some(result) = stream.next().await { let str = serde_json::to_string(&result).map_err(InvokeError::from_serde_json)?; @@ -268,6 +276,12 @@ where Ok(()) }), + "subscriptions_end" => invoke.resolver.respond_async(async move { + let req: SubscriptionRequest = serde_json::from_value(invoke.message.payload().clone()) + .map_err(InvokeError::from_serde_json)?; + SUBSCRIPTIONS.remove(&req.id); + Ok(()) + }), cmd => invoke.resolver.reject(format!( "Invalid endpoint \"{}\". Valid endpoints are: \"graphql\", \"subscriptions\".", cmd