Skip to content

Commit 52bb794

Browse files
authored
Cluster pipeline (#226)
* Add pipeline functions that take an array of existential ValkeyCommands Add makeConnectionPipelineArrayExistentialsBenchmark Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * ValkeyClusterClient can be called with Collection<ValkeyCommand> Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Initial version of full cluster pipeline Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Comment Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Cleanup after rebase Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Clean up code, add parameter pack version of pipeline Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Add more tests Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Renamed NodeResult to NodePipelineResult Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Renamed SparseCollection to IndexedSubCollection Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Changed execute so all errors are stored in Results Signed-off-by: Adam Fowler <adamfowler71@gmail.com> --------- Signed-off-by: Adam Fowler <adamfowler71@gmail.com>
1 parent 0f42792 commit 52bb794

File tree

6 files changed

+398
-16
lines changed

6 files changed

+398
-16
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,117 @@ public final class ValkeyClusterClient: Sendable {
185185
throw ValkeyClusterError.clientRequestCancelled
186186
}
187187

188+
/// Pipeline a series of commands to nodes in the Valkey cluster
189+
///
190+
/// This function splits up the array of commands into smaller arrays containing
191+
/// the commands that should be run on each node in the cluster. It then runs a
192+
/// pipelined execute using these smaller arrays on each node concurrently.
193+
///
194+
/// Once all the responses for the commands have been received the function converys
195+
/// them to their expected Response type.
196+
///
197+
/// Because the commands are split across nodes it is not possible to guarantee
198+
/// the order that commands will run in. The only way to guarantee the order is to
199+
/// only pipeline commands that use keys from the same HashSlot. If a key has a
200+
/// substring between brackets `{}` then that substring is used to calculate the
201+
/// HashSlot. That substring is called the hash tag. Using this you can ensure two
202+
/// keys are in the same hash slot, by giving them the same hash tag eg `user:{123}`
203+
/// and `profile:{123}`.
204+
///
205+
/// - Parameter commands: Parameter pack of ValkeyCommands
206+
/// - Returns: Parameter pack holding the responses of all the commands
207+
@inlinable
208+
public func execute<each Command: ValkeyCommand>(
209+
_ commands: repeat each Command
210+
) async throws -> sending (repeat Result<(each Command).Response, Error>) {
211+
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
212+
result.flatMap {
213+
do {
214+
return try .success(Response(fromRESP: $0))
215+
} catch {
216+
return .failure(error)
217+
}
218+
}
219+
}
220+
let results = try await self.execute([any ValkeyCommand](commands: repeat each commands))
221+
var index = AutoIncrementingInteger()
222+
return (repeat convert(results[index.next()], to: (each Command).Response.self))
223+
}
224+
225+
/// Results from pipeline and index for each result
226+
@usableFromInline
227+
struct NodePipelineResult: Sendable {
228+
@usableFromInline
229+
let indices: [[any ValkeyCommand].Index]
230+
@usableFromInline
231+
let results: [Result<RESPToken, Error>]
232+
233+
@inlinable
234+
init(indices: [[any ValkeyCommand].Index], results: [Result<RESPToken, Error>]) {
235+
self.indices = indices
236+
self.results = results
237+
}
238+
}
239+
240+
/// Pipeline a series of commands to nodes in the Valkey cluster
241+
///
242+
/// This function splits up the array of commands into smaller arrays containing
243+
/// the commands that should be run on each node in the cluster. It then runs a
244+
/// pipelined execute using these smaller arrays on each node concurrently.
245+
///
246+
/// Once all the responses for the commands have been received the function returns
247+
/// an array of RESPToken Results, one for each command.
248+
///
249+
/// Because the commands are split across nodes it is not possible to guarantee
250+
/// the order that commands will run in. The only way to guarantee the order is to
251+
/// only pipeline commands that use keys from the same HashSlot. If a key has a
252+
/// substring between brackets `{}` then that substring is used to calculate the
253+
/// HashSlot. That substring is called the hash tag. Using this you can ensure two
254+
/// keys are in the same hash slot, by giving them the same hash tag eg `user:{123}`
255+
/// and `profile:{123}`.
256+
///
257+
/// - Parameter commands: Parameter pack of ValkeyCommands
258+
/// - Returns: Array holding the RESPToken responses of all the commands
259+
@inlinable
260+
public func execute(
261+
_ commands: [any ValkeyCommand]
262+
) async throws -> sending [Result<RESPToken, Error>] {
263+
guard commands.count > 0 else { return [] }
264+
// get a list of nodes and the commands that should be run on them
265+
let nodes = try await self.splitCommandsAcrossNodes(commands: commands)
266+
// if this list has one element, then just run the pipeline on that single node
267+
if nodes.count == 1 {
268+
do {
269+
return try await self.execute(node: nodes[nodes.startIndex].node, commands: commands)
270+
} catch {
271+
return .init(repeating: .failure(error), count: commands.count)
272+
}
273+
}
274+
return await withTaskGroup(of: NodePipelineResult.self) { group in
275+
// run generated pipelines concurrently
276+
for node in nodes {
277+
let indices = node.commandIndices
278+
group.addTask {
279+
do {
280+
let results = try await self.execute(node: node.node, commands: IndexedSubCollection(commands, indices: indices))
281+
return .init(indices: indices, results: results)
282+
} catch {
283+
return NodePipelineResult(indices: indices, results: .init(repeating: .failure(error), count: indices.count))
284+
}
285+
}
286+
}
287+
var results = [Result<RESPToken, Error>](repeating: .failure(ValkeyClusterError.pipelinedResultNotReturned), count: commands.count)
288+
// get results for each node
289+
while let taskResult = await group.next() {
290+
precondition(taskResult.indices.count == taskResult.results.count)
291+
for index in 0..<taskResult.indices.count {
292+
results[taskResult.indices[index]] = taskResult.results[index]
293+
}
294+
}
295+
return results
296+
}
297+
}
298+
188299
struct Redirection {
189300
let node: ValkeyNodeClient
190301
let ask: Bool
@@ -347,6 +458,74 @@ public final class ValkeyClusterClient: Sendable {
347458
return hashSlot
348459
}
349460

461+
/// Node and list of indices into command array
462+
@usableFromInline
463+
struct NodeAndCommands: Sendable {
464+
@usableFromInline
465+
let node: ValkeyNodeClient
466+
@usableFromInline
467+
var commandIndices: [Int]
468+
469+
@usableFromInline
470+
internal init(node: ValkeyNodeClient, commandIndices: [Int]) {
471+
self.node = node
472+
self.commandIndices = commandIndices
473+
}
474+
}
475+
476+
/// Split command array into multiple arrays of indices into the original array.
477+
///
478+
/// These array of indices are then used to create collections of commands to
479+
/// run on each node
480+
@usableFromInline
481+
func splitCommandsAcrossNodes(commands: [any ValkeyCommand]) async throws -> some Collection<NodeAndCommands> {
482+
var nodeMap: [ValkeyServerAddress: NodeAndCommands] = [:]
483+
var index = commands.startIndex
484+
var prevAddress: ValkeyServerAddress? = nil
485+
// iterate through commands until you reach one that affects a key
486+
while index < commands.endIndex {
487+
let command = commands[index]
488+
index += 1
489+
let keysAffected = command.keysAffected
490+
if keysAffected.count > 0 {
491+
// Get hash slot for key and add all the commands you have iterated through so far to the
492+
// node associated with that key and break out of loop
493+
let hashSlot = try self.hashSlot(for: keysAffected)
494+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
495+
let address = node.serverAddress
496+
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
497+
nodeMap[address] = nodeAndCommands
498+
prevAddress = address
499+
break
500+
}
501+
}
502+
// If we found a key while iterating through the commands iterate through the remaining commands
503+
if var prevAddress {
504+
while index < commands.endIndex {
505+
let command = commands[index]
506+
let keysAffected = command.keysAffected
507+
if keysAffected.count > 0 {
508+
// If command affects a key get hash slot for key and add command to the node associated with that key
509+
let hashSlot = try self.hashSlot(for: keysAffected)
510+
let node = try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
511+
prevAddress = node.serverAddress
512+
nodeMap[prevAddress, default: .init(node: node, commandIndices: [])].commandIndices.append(index)
513+
} else {
514+
// if command doesn't affect a key then use the node the previous command used
515+
nodeMap[prevAddress]!.commandIndices.append(index)
516+
}
517+
index += 1
518+
}
519+
} else {
520+
// if none of the commands affect any keys then choose a random node
521+
let node = try await self.nodeClient(for: [])
522+
let address = node.serverAddress
523+
let nodeAndCommands = NodeAndCommands(node: node, commandIndices: .init(commands.startIndex..<index))
524+
nodeMap[address] = nodeAndCommands
525+
}
526+
return nodeMap.values
527+
}
528+
350529
@usableFromInline
351530
enum RetryAction {
352531
case redirect(ValkeyClusterRedirectionError)
@@ -791,3 +970,18 @@ public final class ValkeyClusterClient: Sendable {
791970
/// This allows the cluster client to be used anywhere a `ValkeyClientProtocol` is expected.
792971
@available(valkeySwift 1.0, *)
793972
extension ValkeyClusterClient: ValkeyClientProtocol {}
973+
974+
extension Array where Element == any ValkeyCommand {
975+
/// Initializer used internally in cluster client and tests for constructing an array
976+
/// of commands from a parameter pack of commands
977+
@inlinable
978+
init<each Command: ValkeyCommand>(
979+
commands: repeat each Command
980+
) {
981+
var commandArray: [any ValkeyCommand] = []
982+
for command in repeat each commands {
983+
commandArray.append(command)
984+
}
985+
self = commandArray
986+
}
987+
}

Sources/Valkey/Cluster/ValkeyClusterError.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public struct ValkeyClusterError: Error, Equatable {
2323
case clusterClientIsShutDown
2424
case clientRequestCancelled
2525
case waitedForDiscoveryAfterMovedErrorThreeTimes
26+
case pipelinedResultNotReturned
2627
}
2728
private let value: Internal
2829
private init(_ value: Internal) {
@@ -57,5 +58,7 @@ public struct ValkeyClusterError: Error, Equatable {
5758
static public var clientRequestCancelled: Self { .init(.clientRequestCancelled) }
5859
/// Wait for discovery failed three times after receiving a MOVED error
5960
static public var waitedForDiscoveryAfterMovedErrorThreeTimes: Self { .init(.waitedForDiscoveryAfterMovedErrorThreeTimes) }
61+
/// Pipelined result not returned. If you receive this, it is an internal error and should be reported as a bug
62+
static public var pipelinedResultNotReturned: Self { .init(.waitedForDiscoveryAfterMovedErrorThreeTimes) }
6063

6164
}

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
319319
///
320320
/// - Parameter commands: Collection of ValkeyCommands
321321
/// - Returns: Array holding the RESPToken responses of all the commands
322-
@inlinable
322+
@usableFromInline
323323
func executeWithAsk(
324324
_ commands: some Collection<any ValkeyCommand>
325325
) async -> sending [Result<RESPToken, Error>] {

Sources/Valkey/Connection/ValkeyServerAddress.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import NIOCore
99

1010
/// A Valkey server address to connect to.
11-
public struct ValkeyServerAddress: Sendable, Equatable {
12-
enum _Internal: Equatable {
11+
public struct ValkeyServerAddress: Sendable, Equatable, Hashable {
12+
enum _Internal: Equatable, Hashable {
1313
case hostname(_ host: String, port: Int)
1414
case unixDomainSocket(path: String)
1515
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
//
2+
// This source file is part of the valkey-swift project
3+
// Copyright (c) 2025 the valkey-swift project authors
4+
//
5+
// See LICENSE.txt for license information
6+
// SPDX-License-Identifier: Apache-2.0
7+
//
8+
@usableFromInline
9+
struct IndexedSubCollection<Base: Collection>: Collection {
10+
@usableFromInline
11+
typealias Element = Base.Element
12+
@usableFromInline
13+
typealias Index = [Base.Index].Index
14+
15+
@usableFromInline
16+
let base: Base
17+
@usableFromInline
18+
let baseIndices: [Base.Index]
19+
20+
@inlinable
21+
var startIndex: Index { self.baseIndices.startIndex }
22+
@inlinable
23+
var endIndex: Index { self.baseIndices.endIndex }
24+
25+
@inlinable
26+
init(_ base: Base, indices: [Base.Index]) {
27+
self.base = base
28+
self.baseIndices = indices
29+
}
30+
31+
@inlinable
32+
func index(after i: Array<Base.Index>.Index) -> Array<Base.Index>.Index {
33+
self.baseIndices.index(after: i)
34+
}
35+
36+
@inlinable
37+
subscript(position: Index) -> Element {
38+
get {
39+
let index = self.baseIndices[position]
40+
return self.base[index]
41+
}
42+
}
43+
44+
@inlinable
45+
subscript(index: Index) -> Base.Index {
46+
get {
47+
self.baseIndices[index]
48+
}
49+
}
50+
51+
@usableFromInline
52+
struct Iterator: IteratorProtocol {
53+
@usableFromInline
54+
let base: Base
55+
@usableFromInline
56+
var iterator: [Base.Index].Iterator
57+
58+
@inlinable
59+
init(base: Base, iterator: [Base.Index].Iterator) {
60+
self.base = base
61+
self.iterator = iterator
62+
}
63+
64+
@inlinable
65+
mutating func next() -> Base.Element? {
66+
if let index = self.iterator.next() {
67+
return base[index]
68+
}
69+
return nil
70+
}
71+
}
72+
73+
@inlinable
74+
func makeIterator() -> Iterator {
75+
.init(base: self.base, iterator: self.baseIndices.makeIterator())
76+
}
77+
}
78+
79+
extension IndexedSubCollection: Sendable where Base: Sendable, Base.Index: Sendable {}

0 commit comments

Comments
 (0)