Skip to content

Commit cbd70d9

Browse files
committed
Add ValkeyClusterClient subscribe commands
Signed-off-by: Adam Fowler <adamfowler71@gmail.com>
1 parent 8f6b6d3 commit cbd70d9

File tree

2 files changed

+297
-0
lines changed

2 files changed

+297
-0
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
//
2+
// This source file is part of the valkey-swift project
3+
// Copyright (c) 2025 the valkey-swift project authors
4+
//
5+
// See LICENSE.txt for license information
6+
// SPDX-License-Identifier: Apache-2.0
7+
//
8+
import NIOCore
9+
import Synchronization
10+
11+
@available(valkeySwift 1.0, *)
12+
extension ValkeyClusterClient {
13+
/// Run operation with the valkey subscription connection
14+
///
15+
/// - Parameters:
16+
/// - isolation: Actor isolation
17+
/// - operation: Closure to run with subscription connection
18+
@inlinable
19+
func withSubscriptionConnection<Value>(
20+
isolation: isolated (any Actor)? = #isolation,
21+
_ operation: (ValkeyConnection) async throws -> sending Value
22+
) async throws -> sending Value {
23+
let node = try await self.nodeClient(for: [])
24+
let id = node.subscriptionConnectionIDGenerator.next()
25+
26+
let connection = try await withTaskCancellationHandler {
27+
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<ValkeyConnection, Error>) in
28+
node.leaseSubscriptionConnection(id: id, request: cont)
29+
}
30+
} onCancel: {
31+
node.cancelSubscriptionConnection(id: id)
32+
}
33+
34+
defer {
35+
node.releaseSubscriptionConnection(id: id)
36+
}
37+
return try await operation(connection)
38+
}
39+
40+
/// Subscribe to list of channels and run closure with subscription
41+
///
42+
/// When the closure is exited the channels are automatically unsubscribed from.
43+
///
44+
/// When running subscribe from `ValkeyClient` a single connection is used for
45+
/// all subscriptions.
46+
///
47+
/// - Parameters:
48+
/// - channels: list of channels to subscribe to
49+
/// - isolation: Actor isolation
50+
/// - process: Closure that is called with subscription async sequence
51+
/// - Returns: Return value of closure
52+
@inlinable
53+
public func subscribe<Value>(
54+
to channels: String...,
55+
isolation: isolated (any Actor)? = #isolation,
56+
process: (ValkeySubscription) async throws -> sending Value
57+
) async throws -> sending Value {
58+
try await self.subscribe(to: channels, process: process)
59+
}
60+
61+
@inlinable
62+
/// Subscribe to list of channels and run closure with subscription
63+
///
64+
/// When the closure is exited the channels are automatically unsubscribed from.
65+
///
66+
/// When running subscribe from `ValkeyClient` a single connection is used for
67+
/// all subscriptions.
68+
///
69+
/// - Parameters:
70+
/// - channels: list of channels to subscribe to
71+
/// - isolation: Actor isolation
72+
/// - process: Closure that is called with subscription async sequence
73+
/// - Returns: Return value of closure
74+
public func subscribe<Value>(
75+
to channels: [String],
76+
isolation: isolated (any Actor)? = #isolation,
77+
process: (ValkeySubscription) async throws -> sending Value
78+
) async throws -> sending Value {
79+
try await self.subscribe(
80+
command: SUBSCRIBE(channels: channels),
81+
filters: channels.map { .channel($0) },
82+
process: process
83+
)
84+
}
85+
86+
/// Subscribe to list of channel patterns and run closure with subscription
87+
///
88+
/// When the closure is exited the patterns are automatically unsubscribed from.
89+
///
90+
/// When running subscribe from `ValkeyClient` a single connection is used for
91+
/// all subscriptions.
92+
///
93+
/// - Parameters:
94+
/// - patterns: list of channel patterns to subscribe to
95+
/// - isolation: Actor isolation
96+
/// - process: Closure that is called with subscription async sequence
97+
/// - Returns: Return value of closure
98+
@inlinable
99+
public func psubscribe<Value>(
100+
to patterns: String...,
101+
isolation: isolated (any Actor)? = #isolation,
102+
process: (ValkeySubscription) async throws -> sending Value
103+
) async throws -> sending Value {
104+
try await self.psubscribe(to: patterns, process: process)
105+
}
106+
107+
/// Subscribe to list of pattern matching channels and run closure with subscription
108+
///
109+
/// When the closure is exited the patterns are automatically unsubscribed from.
110+
///
111+
/// When running subscribe from `ValkeyClient` a single connection is used for
112+
/// all subscriptions.
113+
///
114+
/// - Parameters:
115+
/// - patterns: list of channel patterns to subscribe to
116+
/// - isolation: Actor isolation
117+
/// - process: Closure that is called with subscription async sequence
118+
/// - Returns: Return value of closure
119+
@inlinable
120+
public func psubscribe<Value>(
121+
to patterns: [String],
122+
isolation: isolated (any Actor)? = #isolation,
123+
process: (ValkeySubscription) async throws -> sending Value
124+
) async throws -> sending Value {
125+
try await self.subscribe(
126+
command: PSUBSCRIBE(patterns: patterns),
127+
filters: patterns.map { .pattern($0) },
128+
process: process
129+
)
130+
}
131+
132+
/// Subscribe to key invalidation channel required for client-side caching
133+
///
134+
/// See https://valkey.io/topics/client-side-caching/ for more details. The `process`
135+
/// closure is provided with a stream of ValkeyKeys that have been invalidated and also
136+
/// the client id of the subscription connection to redirect client tracking messages to.
137+
///
138+
/// When the closure is exited the channel is automatically unsubscribed from.
139+
///
140+
/// When running subscribe from `ValkeyClient` a single connection is used for
141+
/// all subscriptions.
142+
///
143+
/// - Parameters:
144+
/// - isolation: Actor isolation
145+
/// - process: Closure that is called with async sequence of key invalidations and the client id
146+
/// of the connection the subscription is running on.
147+
/// - Returns: Return value of closure
148+
@inlinable
149+
public func subscribeKeyInvalidations<Value>(
150+
isolation: isolated (any Actor)? = #isolation,
151+
process: (AsyncMapSequence<ValkeySubscription, ValkeyKey>, Int) async throws -> sending Value
152+
) async throws -> sending Value {
153+
try await withSubscriptionConnection { connection in
154+
let id = try await connection.clientId()
155+
return try await connection.subscribe(to: [ValkeySubscriptions.invalidateChannel]) { subscription in
156+
let keys = subscription.map { ValkeyKey($0.message) }
157+
return try await process(keys, id)
158+
}
159+
}
160+
}
161+
162+
@inlinable
163+
func subscribe<Value>(
164+
command: some ValkeyCommand,
165+
filters: [ValkeySubscriptionFilter],
166+
isolation: isolated (any Actor)? = #isolation,
167+
process: (ValkeySubscription) async throws -> sending Value
168+
) async throws -> sending Value {
169+
try await self.withSubscriptionConnection { connection in
170+
try await connection.subscribe(command: command, filters: filters, process: process)
171+
}
172+
}
173+
}

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,130 @@ struct ClusterIntegrationTests {
134134
}
135135
}
136136
}
137+
@Test
138+
@available(valkeySwift 1.0, *)
139+
func testClusterClientSubscriptions() async throws {
140+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
141+
var logger = Logger(label: "Subscriptions")
142+
logger.logLevel = .trace
143+
let firstNodeHostname = clusterFirstNodeHostname!
144+
let firstNodePort = clusterFirstNodePort ?? 6379
145+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
146+
try await withThrowingTaskGroup(of: Void.self) { group in
147+
group.addTask {
148+
try await client.subscribe(to: "testSubscriptions") { subscription in
149+
cont.finish()
150+
var iterator = subscription.makeAsyncIterator()
151+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
152+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "goodbye" }
153+
}
154+
}
155+
await stream.first { _ in true }
156+
try await Task.sleep(for: .milliseconds(100))
157+
_ = try await client.publish(channel: "testSubscriptions", message: "hello")
158+
_ = try await client.publish(channel: "testSubscriptions", message: "goodbye")
159+
try await group.waitForAll()
160+
}
161+
}
162+
}
163+
164+
@Test
165+
@available(valkeySwift 1.0, *)
166+
func testClientSubscriptionsTwice() async throws {
167+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
168+
var logger = Logger(label: "Subscriptions")
169+
logger.logLevel = .trace
170+
let firstNodeHostname = clusterFirstNodeHostname!
171+
let firstNodePort = clusterFirstNodePort ?? 6379
172+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
173+
try await withThrowingTaskGroup(of: Void.self) { group in
174+
group.addTask {
175+
try await client.subscribe(to: "testSubscriptions") { subscription in
176+
cont.yield()
177+
var iterator = subscription.makeAsyncIterator()
178+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
179+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "goodbye" }
180+
}
181+
try await client.subscribe(to: "testSubscriptions") { subscription in
182+
cont.finish()
183+
var iterator = subscription.makeAsyncIterator()
184+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "hello" }
185+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "goodbye" }
186+
}
187+
}
188+
await stream.first { _ in true }
189+
try await Task.sleep(for: .milliseconds(10))
190+
_ = try await client.publish(channel: "testSubscriptions", message: "hello")
191+
_ = try await client.publish(channel: "testSubscriptions", message: "goodbye")
192+
await stream.first { _ in true }
193+
try await Task.sleep(for: .milliseconds(10))
194+
_ = try await client.publish(channel: "testSubscriptions", message: "hello")
195+
_ = try await client.publish(channel: "testSubscriptions", message: "goodbye")
196+
try await group.waitForAll()
197+
}
198+
}
199+
}
200+
201+
@Test
202+
@available(valkeySwift 1.0, *)
203+
func testClientMultipleSubscriptions() async throws {
204+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
205+
var logger = Logger(label: "Subscriptions")
206+
logger.logLevel = .trace
207+
let firstNodeHostname = clusterFirstNodeHostname!
208+
let firstNodePort = clusterFirstNodePort ?? 6379
209+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
210+
try await withThrowingTaskGroup(of: Void.self) { group in
211+
let count = 50
212+
for i in 0..<count {
213+
group.addTask {
214+
try await client.subscribe(to: ["sub\(i)", "sub\(i+1)"]) { subscription in
215+
cont.yield()
216+
var iterator = subscription.makeAsyncIterator()
217+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "\(i)" }
218+
client.logger.info("Received \(i): \(i)")
219+
await #expect(throws: Never.self) { try await iterator.next().map { String(buffer: $0.message) } == "\(i+1)" }
220+
client.logger.info("Received \(i): \(i+1)")
221+
}
222+
}
223+
}
224+
var iterator = stream.makeAsyncIterator()
225+
for _ in 0..<count {
226+
await iterator.next()
227+
}
228+
229+
try await Task.sleep(for: .milliseconds(200))
230+
for i in 0..<(count + 1) {
231+
try await client.publish(channel: "sub\(i)", message: "\(i)")
232+
client.logger.info("Published \(i)")
233+
}
234+
try await group.waitForAll()
235+
}
236+
}
237+
}
238+
239+
@Test
240+
@available(valkeySwift 1.0, *)
241+
func testClientCancelSubscription() async throws {
242+
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
243+
var logger = Logger(label: "Subscriptions")
244+
logger.logLevel = .trace
245+
let firstNodeHostname = clusterFirstNodeHostname!
246+
let firstNodePort = clusterFirstNodePort ?? 6379
247+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
248+
await withThrowingTaskGroup(of: Void.self) { group in
249+
group.addTask {
250+
try await client.subscribe(to: "testCancelSubscriptions") { subscription in
251+
cont.finish()
252+
for try await _ in subscription {
253+
}
254+
}
255+
}
256+
await stream.first { _ in true }
257+
group.cancelAll()
258+
}
259+
}
260+
}
137261

138262
@available(valkeySwift 1.0, *)
139263
func testMigratingHashSlot(

0 commit comments

Comments
 (0)