Skip to content

Commit 6b02ba8

Browse files
Add xtrim with minid and new 8.2 stream features (#2912)
* add minId overloads * fix test. clarify doc comment i now think the XTRIM documentation is saying that an entry at exactly MINID is kept. https://redis.io/docs/latest/commands/xtrim/ * fix test. forgot to update the length check. * change method name to StreamTrimByMinIdAsync implement useApproximateMaxLength and limit as per docs * add stream delete mode to minid api add xackdel * xref new pr * add limit to StreamTrim (maxlen) * XADD KEEPREF|DELREF|ACKED * more release notes * naming is hard * XDELEX * merge shipped * Update StreamTests.cs --------- Co-authored-by: Kijana Woodard <kijana.woodard@kaizen.io>
1 parent c54c159 commit 6b02ba8

File tree

16 files changed

+771
-82
lines changed

16 files changed

+771
-82
lines changed

StackExchange.Redis.sln

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
1919
docs\ReleaseNotes.md = docs\ReleaseNotes.md
2020
Shared.ruleset = Shared.ruleset
2121
version.json = version.json
22+
tests\RedisConfigs\docker-compose.yml = tests\RedisConfigs\docker-compose.yml
2223
EndProjectSection
2324
EndProject
2425
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "RedisConfigs", "RedisConfigs", "{96E891CD-2ED7-4293-A7AB-4C6F5D8D2B05}"

docs/ReleaseNotes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ Current package versions:
1616
- Package updates ([#2906 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2906))
1717
- Docs: added [guidance on async timeouts](https://stackexchange.github.io/StackExchange.Redis/AsyncTimeouts) ([#2910 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2910))
1818
- Fix handshake error with `CLIENT ID` ([#2909 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2909))
19+
- Add `XTRIM MINID` support ([#2842 by kijanawoodard](https://github.com/StackExchange/StackExchange.Redis/pull/2842))
20+
- Add new CE 8.2 stream support - `XDELEX`, `XACKDEL`, `{XADD|XTRIM} [KEEPREF|DELREF|ACKED]` ([#2912 by mgravell](https://github.com/StackExchange/StackExchange.Redis/pull/2912))
1921

2022
## 2.8.41
2123

src/StackExchange.Redis/Enums/RedisCommand.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,12 @@ internal enum RedisCommand
206206
WATCH,
207207

208208
XACK,
209+
XACKDEL,
209210
XADD,
210211
XAUTOCLAIM,
211212
XCLAIM,
212213
XDEL,
214+
XDELEX,
213215
XGROUP,
214216
XINFO,
215217
XLEN,
@@ -496,9 +498,11 @@ internal static bool IsPrimaryOnly(this RedisCommand command)
496498
case RedisCommand.GEOADD:
497499
case RedisCommand.SORT:
498500
case RedisCommand.XACK:
501+
case RedisCommand.XACKDEL:
499502
case RedisCommand.XADD:
500503
case RedisCommand.XCLAIM:
501504
case RedisCommand.XDEL:
505+
case RedisCommand.XDELEX:
502506
case RedisCommand.XGROUP:
503507
case RedisCommand.XREADGROUP:
504508
case RedisCommand.XTRIM:
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
namespace StackExchange.Redis;
2+
3+
/// <summary>
4+
/// Determines how stream trimming works.
5+
/// </summary>
6+
public enum StreamTrimMode
7+
{
8+
/// <summary>
9+
/// Trims the stream according to the specified policy (MAXLEN or MINID) regardless of whether entries are referenced by any consumer groups, but preserves existing references to these entries in all consumer groups' PEL.
10+
/// </summary>
11+
KeepReferences = 0,
12+
13+
/// <summary>
14+
/// Trims the stream according to the specified policy and also removes all references to the trimmed entries from all consumer groups' PEL.
15+
/// </summary>
16+
/// <remarks>Requires server 8.2 or above.</remarks>
17+
DeleteReferences = 1,
18+
19+
/// <summary>
20+
/// With ACKED: Only trims entries that were read and acknowledged by all consumer groups.
21+
/// </summary>
22+
/// <remarks>Requires server 8.2 or above.</remarks>
23+
Acknowledged = 2,
24+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace StackExchange.Redis;
2+
3+
/// <summary>
4+
/// Determines how stream trimming works.
5+
/// </summary>
6+
public enum StreamTrimResult
7+
{
8+
/// <summary>
9+
/// No such id exists in the provided stream key.
10+
/// </summary>
11+
NotFound = -1,
12+
13+
/// <summary>
14+
/// Entry was deleted from the stream.
15+
/// </summary>
16+
Deleted = 1,
17+
18+
/// <summary>
19+
/// Entry was not deleted, but there are still dangling references.
20+
/// </summary>
21+
/// <remarks>This response relates to the <see cref="StreamTrimMode.Acknowledged"/> mode.</remarks>
22+
NotDeleted = 2,
23+
}

src/StackExchange.Redis/Interfaces/IDatabase.cs

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2440,6 +2440,34 @@ IEnumerable<SortedSetEntry> SortedSetScan(
24402440
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
24412441
long StreamAcknowledge(RedisKey key, RedisValue groupName, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
24422442

2443+
/// <summary>
2444+
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
2445+
/// </summary>
2446+
/// <param name="key">The key of the stream.</param>
2447+
/// <param name="groupName">The name of the consumer group that received the message.</param>
2448+
/// <param name="mode">The delete mode to use when acknowledging the message.</param>
2449+
/// <param name="messageId">The ID of the message to acknowledge.</param>
2450+
/// <param name="flags">The flags to use for this operation.</param>
2451+
/// <returns>The outcome of the delete operation.</returns>
2452+
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2453+
#pragma warning disable RS0026 // similar overloads
2454+
StreamTrimResult StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue messageId, CommandFlags flags = CommandFlags.None);
2455+
#pragma warning restore RS0026
2456+
2457+
/// <summary>
2458+
/// Allow the consumer to mark a pending message as correctly processed. Returns the number of messages acknowledged.
2459+
/// </summary>
2460+
/// <param name="key">The key of the stream.</param>
2461+
/// <param name="groupName">The name of the consumer group that received the message.</param>
2462+
/// /// <param name="mode">The delete mode to use when acknowledging the message.</param>
2463+
/// <param name="messageIds">The IDs of the messages to acknowledge.</param>
2464+
/// <param name="flags">The flags to use for this operation.</param>
2465+
/// <returns>The outcome of each delete operation.</returns>
2466+
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2467+
#pragma warning disable RS0026 // similar overloads
2468+
StreamTrimResult[] StreamAcknowledgeAndDelete(RedisKey key, RedisValue groupName, StreamTrimMode mode, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
2469+
#pragma warning restore RS0026
2470+
24432471
/// <summary>
24442472
/// Adds an entry using the specified values to the given stream key.
24452473
/// If key does not exist, a new key holding a stream is created.
@@ -2454,7 +2482,7 @@ IEnumerable<SortedSetEntry> SortedSetScan(
24542482
/// <param name="flags">The flags to use for this operation.</param>
24552483
/// <returns>The ID of the newly created message.</returns>
24562484
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2457-
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
2485+
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
24582486

24592487
/// <summary>
24602488
/// Adds an entry using the specified values to the given stream key.
@@ -2469,7 +2497,46 @@ IEnumerable<SortedSetEntry> SortedSetScan(
24692497
/// <param name="flags">The flags to use for this operation.</param>
24702498
/// <returns>The ID of the newly created message.</returns>
24712499
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2472-
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, int? maxLength = null, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
2500+
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId, int? maxLength, bool useApproximateMaxLength, CommandFlags flags);
2501+
2502+
/// <summary>
2503+
/// Adds an entry using the specified values to the given stream key.
2504+
/// If key does not exist, a new key holding a stream is created.
2505+
/// The command returns the ID of the newly created stream entry.
2506+
/// </summary>
2507+
/// <param name="key">The key of the stream.</param>
2508+
/// <param name="streamField">The field name for the stream entry.</param>
2509+
/// <param name="streamValue">The value to set in the stream entry.</param>
2510+
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
2511+
/// <param name="maxLength">The maximum length of the stream.</param>
2512+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2513+
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
2514+
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
2515+
/// <param name="flags">The flags to use for this operation.</param>
2516+
/// <returns>The ID of the newly created message.</returns>
2517+
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2518+
#pragma warning disable RS0026 // different shape
2519+
RedisValue StreamAdd(RedisKey key, RedisValue streamField, RedisValue streamValue, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2520+
#pragma warning restore RS0026
2521+
2522+
/// <summary>
2523+
/// Adds an entry using the specified values to the given stream key.
2524+
/// If key does not exist, a new key holding a stream is created.
2525+
/// The command returns the ID of the newly created stream entry.
2526+
/// </summary>
2527+
/// <param name="key">The key of the stream.</param>
2528+
/// <param name="streamPairs">The fields and their associated values to set in the stream entry.</param>
2529+
/// <param name="messageId">The ID to assign to the stream entry, defaults to an auto-generated ID ("*").</param>
2530+
/// <param name="maxLength">The maximum length of the stream.</param>
2531+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2532+
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
2533+
/// <param name="trimMode">Determines how stream trimming should be performed.</param>
2534+
/// <param name="flags">The flags to use for this operation.</param>
2535+
/// <returns>The ID of the newly created message.</returns>
2536+
/// <remarks><seealso href="https://redis.io/commands/xadd"/></remarks>
2537+
#pragma warning disable RS0026 // different shape
2538+
RedisValue StreamAdd(RedisKey key, NameValueEntry[] streamPairs, RedisValue? messageId = null, long? maxLength = null, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode trimMode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2539+
#pragma warning restore RS0026
24732540

24742541
/// <summary>
24752542
/// Change ownership of messages consumed, but not yet acknowledged, by a different consumer.
@@ -2583,7 +2650,22 @@ IEnumerable<SortedSetEntry> SortedSetScan(
25832650
/// <param name="flags">The flags to use for this operation.</param>
25842651
/// <returns>Returns the number of messages successfully deleted from the stream.</returns>
25852652
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2653+
#pragma warning disable RS0026 // similar overloads
25862654
long StreamDelete(RedisKey key, RedisValue[] messageIds, CommandFlags flags = CommandFlags.None);
2655+
#pragma warning restore RS0026
2656+
2657+
/// <summary>
2658+
/// Delete messages in the stream. This method does not delete the stream.
2659+
/// </summary>
2660+
/// <param name="key">The key of the stream.</param>
2661+
/// <param name="messageIds">The IDs of the messages to delete.</param>
2662+
/// <param name="mode">Determines how stream trimming should be performed.</param>
2663+
/// <param name="flags">The flags to use for this operation.</param>
2664+
/// <returns>Returns the number of messages successfully deleted from the stream.</returns>
2665+
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2666+
#pragma warning disable RS0026 // similar overloads
2667+
StreamTrimResult[] StreamDelete(RedisKey key, RedisValue[] messageIds, StreamTrimMode mode, CommandFlags flags = CommandFlags.None);
2668+
#pragma warning restore RS0026
25872669

25882670
/// <summary>
25892671
/// Delete a consumer from a consumer group.
@@ -2773,7 +2855,33 @@ IEnumerable<SortedSetEntry> SortedSetScan(
27732855
/// <param name="flags">The flags to use for this operation.</param>
27742856
/// <returns>The number of messages removed from the stream.</returns>
27752857
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2776-
long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);
2858+
long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength, CommandFlags flags);
2859+
2860+
/// <summary>
2861+
/// Trim the stream to a specified maximum length.
2862+
/// </summary>
2863+
/// <param name="key">The key of the stream.</param>
2864+
/// <param name="maxLength">The maximum length of the stream.</param>
2865+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed max length by a small number. This improves performance when removing messages.</param>
2866+
/// <param name="limit">Specifies the maximal count of entries that will be evicted.</param>
2867+
/// <param name="mode">Determines how stream trimming should be performed.</param>
2868+
/// <param name="flags">The flags to use for this operation.</param>
2869+
/// <returns>The number of messages removed from the stream.</returns>
2870+
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2871+
long StreamTrim(RedisKey key, long maxLength, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
2872+
2873+
/// <summary>
2874+
/// Trim the stream to a specified minimum timestamp.
2875+
/// </summary>
2876+
/// <param name="key">The key of the stream.</param>
2877+
/// <param name="minId">All entries with an id (timestamp) earlier minId will be removed.</param>
2878+
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed minId by a small number. This improves performance when removing messages.</param>
2879+
/// <param name="limit">The maximum number of entries to remove per call when useApproximateMaxLength = true. If 0, the limiting mechanism is disabled entirely.</param>
2880+
/// <param name="mode">Determines how stream trimming should be performed.</param>
2881+
/// <param name="flags">The flags to use for this operation.</param>
2882+
/// <returns>The number of messages removed from the stream.</returns>
2883+
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
2884+
long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, long? limit = null, StreamTrimMode mode = StreamTrimMode.KeepReferences, CommandFlags flags = CommandFlags.None);
27772885

27782886
/// <summary>
27792887
/// If key already exists and is a string, this command appends the value at the end of the string.

0 commit comments

Comments
 (0)