Skip to content

Commit cf9f659

Browse files
authored
Implement internal cancellation for SCAN via WithCancellation (#2911)
* - implement internal cancellation for SCAN via WithCancellation * release notes
1 parent b4aaced commit cf9f659

File tree

5 files changed

+97
-5
lines changed

5 files changed

+97
-5
lines changed

docs/AsyncTimeouts.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,4 +62,18 @@ using var cts = CancellationTokenSource.CreateLinkedTokenSource(token); // or mu
6262
cts.CancelAfter(timeout);
6363
await database.StringSetAsync("key", "value").WaitAsync(cts.Token);
6464
var value = await database.StringGetAsync("key").WaitAsync(cts.Token);
65-
``````
65+
```
66+
67+
### Cancelling keys enumeration
68+
69+
Keys being enumerated (via `SCAN`) can *also* be cancelled, using the inbuilt `.WithCancellation(...)` method:
70+
71+
```csharp
72+
CancellationToken token = ...; // for example, from HttpContext.RequestAborted
73+
await foreach (var key in server.KeysAsync(pattern: "*foo*").WithCancellation(token))
74+
{
75+
...
76+
}
77+
```
78+
79+
To use a timeout instead, you can use the `CancellationTokenSource` approach shown above.

docs/ReleaseNotes.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@ Current package versions:
88

99
## Unreleased
1010

11-
- (none)
11+
- Support async cancellation of `SCAN` enumeration ([#2911 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2911))
12+
- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842))
13+
- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912))
1214

1315
## 2.8.47
1416

1517
- Add support for new `BITOP` operations in CE 8.2 ([#2900 by atakavci](https://github.com/StackExchange/StackExchange.Redis/pull/2900))
1618
- Package updates ([#2906 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2906))
1719
- Docs: added [guidance on async timeouts](https://stackexchange.github.io/StackExchange.Redis/AsyncTimeouts) ([#2910 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2910))
1820
- Fix handshake error with `CLIENT ID` ([#2909 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2909))
19-
- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842))
20-
- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912))
2121

2222
## 2.8.41
2323

src/StackExchange.Redis/CursorEnumerable.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ private bool SimpleNext()
141141
{
142142
if (_pageOffset + 1 < _pageCount)
143143
{
144+
cancellationToken.ThrowIfCancellationRequested();
144145
_pageOffset++;
145146
return true;
146147
}
@@ -274,7 +275,7 @@ private async ValueTask<bool> AwaitedNextAsync(bool isInitial)
274275
ScanResult scanResult;
275276
try
276277
{
277-
scanResult = await pending.ForAwait();
278+
scanResult = await pending.WaitAsync(cancellationToken).ForAwait();
278279
}
279280
catch (Exception ex)
280281
{

src/StackExchange.Redis/TaskExtensions.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,44 @@ internal static Task<T> ObserveErrors<T>(this Task<T> task)
2525
return task;
2626
}
2727

28+
#if !NET6_0_OR_GREATER
29+
// suboptimal polyfill version of the .NET 6+ API, but reasonable for light use
30+
internal static Task<T> WaitAsync<T>(this Task<T> task, CancellationToken cancellationToken)
31+
{
32+
if (task.IsCompleted || !cancellationToken.CanBeCanceled) return task;
33+
return Wrap(task, cancellationToken);
34+
35+
static async Task<T> Wrap(Task<T> task, CancellationToken cancellationToken)
36+
{
37+
var tcs = new TaskSourceWithToken<T>(cancellationToken);
38+
using var reg = cancellationToken.Register(
39+
static state => ((TaskSourceWithToken<T>)state!).Cancel(), tcs);
40+
_ = task.ContinueWith(
41+
static (t, state) =>
42+
{
43+
var tcs = (TaskSourceWithToken<T>)state!;
44+
if (t.IsCanceled) tcs.TrySetCanceled();
45+
else if (t.IsFaulted) tcs.TrySetException(t.Exception!);
46+
else tcs.TrySetResult(t.Result);
47+
},
48+
tcs);
49+
return await tcs.Task;
50+
}
51+
}
52+
53+
// the point of this type is to combine TCS and CT so that we can use a static
54+
// registration via Register
55+
private sealed class TaskSourceWithToken<T> : TaskCompletionSource<T>
56+
{
57+
public TaskSourceWithToken(CancellationToken cancellationToken)
58+
=> _cancellationToken = cancellationToken;
59+
60+
private readonly CancellationToken _cancellationToken;
61+
62+
public void Cancel() => TrySetCanceled(_cancellationToken);
63+
}
64+
#endif
65+
2866
[MethodImpl(MethodImplOptions.AggressiveInlining)]
2967
internal static ConfiguredTaskAwaitable ForAwait(this Task task) => task.ConfigureAwait(false);
3068
[MethodImpl(MethodImplOptions.AggressiveInlining)]

tests/StackExchange.Redis.Tests/CancellationTests.cs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ public async Task WithCancellation_ValidToken_OperationSucceeds()
4040

4141
private static void Pause(IDatabase db) => db.Execute("client", ["pause", ConnectionPauseMilliseconds], CommandFlags.FireAndForget);
4242

43+
private void Pause(IServer server)
44+
{
45+
server.Execute("client", new object[] { "pause", ConnectionPauseMilliseconds }, CommandFlags.FireAndForget);
46+
}
47+
4348
[Fact]
4449
public async Task WithTimeout_ShortTimeout_Async_ThrowsOperationCanceledException()
4550
{
@@ -147,4 +152,38 @@ public async Task CancellationDuringOperation_Async_CancelsGracefully(CancelStra
147152
Assert.Equal(cts.Token, oce.CancellationToken);
148153
}
149154
}
155+
156+
[Fact]
157+
public async Task ScanCancellable()
158+
{
159+
using var conn = Create();
160+
var db = conn.GetDatabase();
161+
var server = conn.GetServer(conn.GetEndPoints()[0]);
162+
163+
using var cts = new CancellationTokenSource();
164+
165+
var watch = Stopwatch.StartNew();
166+
Pause(server);
167+
try
168+
{
169+
db.StringSet(Me(), "value", TimeSpan.FromMinutes(5), flags: CommandFlags.FireAndForget);
170+
await using var iter = server.KeysAsync(pageSize: 1000).WithCancellation(cts.Token).GetAsyncEnumerator();
171+
var pending = iter.MoveNextAsync();
172+
Assert.False(cts.Token.IsCancellationRequested);
173+
cts.CancelAfter(ShortDelayMilliseconds); // start this *after* we've got past the initial check
174+
while (await pending)
175+
{
176+
pending = iter.MoveNextAsync();
177+
}
178+
Assert.Fail($"{ExpectedCancel}: {watch.ElapsedMilliseconds}ms");
179+
}
180+
catch (OperationCanceledException oce)
181+
{
182+
var taken = watch.ElapsedMilliseconds;
183+
// Expected if cancellation happens during operation
184+
Log($"Cancelled after {taken}ms");
185+
Assert.True(taken < ConnectionPauseMilliseconds / 2, "Should have cancelled much sooner");
186+
Assert.Equal(cts.Token, oce.CancellationToken);
187+
}
188+
}
150189
}

0 commit comments

Comments
 (0)