diff --git a/src/StackExchange.Redis/Interfaces/IDatabase.cs b/src/StackExchange.Redis/Interfaces/IDatabase.cs index 207c03326..ee50213f4 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabase.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabase.cs @@ -2775,6 +2775,18 @@ IEnumerable SortedSetScan( /// long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// + /// Trim the stream to a specified minimum timestamp. + /// + /// The key of the stream. + /// All entries with an id (timestamp) earlier minId will be removed. + /// If true, the "~" argument is used to allow the stream to exceed minId by a small number. This improves performance when removing messages. + /// The maximum number of entries to remove per call when useApproximateMaxLength = true. If 0, the limiting mechanism is disabled entirely. + /// The flags to use for this operation. + /// The number of messages removed from the stream. + /// + long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None); + /// /// If key already exists and is a string, this command appends the value at the end of the string. /// If key does not exist it is created and set as an empty string, so APPEND will be similar to SET in this special case. diff --git a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs index 9852c131c..e81f1a464 100644 --- a/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs +++ b/src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs @@ -672,6 +672,9 @@ IAsyncEnumerable SortedSetScanAsync( /// Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None); + /// + Task StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None); + /// Task StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs index b97bba73b..f3d5ac151 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs @@ -642,6 +642,9 @@ public Task StreamReadGroupAsync(StreamPosition[] streamPositions public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags); + public Task StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamTrimByMinIdAsync(ToInner(key), minId, useApproximateMaxLength, limit, flags); + public Task StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) => Inner.StringAppendAsync(ToInner(key), value, flags); diff --git a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs index 75d93d0f9..73319fadf 100644 --- a/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs +++ b/src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixedDatabase.cs @@ -624,6 +624,9 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) => Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags); + public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) => + Inner.StreamTrimByMinId(ToInner(key), minId, useApproximateMaxLength, limit, flags); + public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) => Inner.StringAppend(ToInner(key), value, flags); diff --git a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt index 91b0e1a43..f7293cbe7 100644 --- a/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt +++ b/src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt @@ -1 +1,3 @@ -#nullable enable \ No newline at end of file +#nullable enable +StackExchange.Redis.IDatabase.StreamTrimByMinId(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long +StackExchange.Redis.IDatabaseAsync.StreamTrimByMinIdAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task! \ No newline at end of file diff --git a/src/StackExchange.Redis/RedisDatabase.cs b/src/StackExchange.Redis/RedisDatabase.cs index 716176662..9c1c46d29 100644 --- a/src/StackExchange.Redis/RedisDatabase.cs +++ b/src/StackExchange.Redis/RedisDatabase.cs @@ -3007,6 +3007,18 @@ public Task StreamTrimAsync(RedisKey key, int maxLength, bool useApproxima return ExecuteAsync(msg, ResultProcessor.Int64); } + public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamTrimByMinIdMessage(key, minId, useApproximateMaxLength, limit, flags); + return ExecuteSync(msg, ResultProcessor.Int64); + } + + public Task StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) + { + var msg = GetStreamTrimByMinIdMessage(key, minId, useApproximateMaxLength, limit, flags); + return ExecuteAsync(msg, ResultProcessor.Int64); + } + public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) { var msg = Message.Create(Database, flags, RedisCommand.APPEND, key, value); @@ -4494,6 +4506,40 @@ private Message GetStreamTrimMessage(RedisKey key, int maxLength, bool useApprox values); } + private Message GetStreamTrimByMinIdMessage(RedisKey key, RedisValue minId, bool useApproximateMaxLength, int? limit, CommandFlags flags) + { + if (limit.HasValue && limit.Value <= 0) + { + throw new ArgumentOutOfRangeException(nameof(limit), "limit must be greater than 0 when specified."); + } + + var values = new RedisValue[2 + (useApproximateMaxLength ? 1 : 0) + (useApproximateMaxLength && limit.HasValue ? 2 : 0)]; + + var offset = 0; + + values[offset++] = StreamConstants.MinId; + + if (useApproximateMaxLength) + { + values[offset++] = StreamConstants.ApproximateMaxLen; + } + + values[offset++] = minId; + + if (useApproximateMaxLength && limit.HasValue) + { + values[offset++] = RedisLiterals.LIMIT; + values[offset] = limit.Value; + } + + return Message.Create( + Database, + flags, + RedisCommand.XTRIM, + key, + values); + } + private Message? GetStringBitOperationMessage(Bitwise operation, RedisKey destination, RedisKey[] keys, CommandFlags flags) { if (keys == null) throw new ArgumentNullException(nameof(keys)); diff --git a/src/StackExchange.Redis/StreamConstants.cs b/src/StackExchange.Redis/StreamConstants.cs index 74650e010..3f34c2e5e 100644 --- a/src/StackExchange.Redis/StreamConstants.cs +++ b/src/StackExchange.Redis/StreamConstants.cs @@ -59,6 +59,7 @@ internal static class StreamConstants internal static readonly RedisValue SetId = "SETID"; internal static readonly RedisValue MaxLen = "MAXLEN"; + internal static readonly RedisValue MinId = "MINID"; internal static readonly RedisValue MkStream = "MKSTREAM"; diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs index f467aca24..5ed06745c 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs @@ -1202,6 +1202,27 @@ public void StreamTrim() mock.Received().StreamTrim("prefix:key", 1000, true, CommandFlags.None); } + [Fact] + public void StreamTrimByMinId() + { + prefixed.StreamTrimByMinId("key", 1111111111); + mock.Received().StreamTrimByMinId("prefix:key", 1111111111); + } + + [Fact] + public void StreamTrimByMinIdWithApproximate() + { + prefixed.StreamTrimByMinId("key", 1111111111, useApproximateMaxLength: true); + mock.Received().StreamTrimByMinId("prefix:key", 1111111111, useApproximateMaxLength: true); + } + + [Fact] + public void StreamTrimByMinIdWithApproximateAndLimit() + { + prefixed.StreamTrimByMinId("key", 1111111111, useApproximateMaxLength: true, limit: 100); + mock.Received().StreamTrimByMinId("prefix:key", 1111111111, useApproximateMaxLength: true, limit: 100); + } + [Fact] public void StringAppend() { diff --git a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs index aebc8fc6d..045861f0a 100644 --- a/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs +++ b/tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs @@ -1118,6 +1118,27 @@ public async Task StreamTrimAsync() await mock.Received().StreamTrimAsync("prefix:key", 1000, true, CommandFlags.None); } + [Fact] + public async Task StreamTrimByMinIdAsync() + { + await prefixed.StreamTrimByMinIdAsync("key", 1111111111); + await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111); + } + + [Fact] + public async Task StreamTrimByMinIdAsyncWithApproximate() + { + await prefixed.StreamTrimByMinIdAsync("key", 1111111111, useApproximateMaxLength: true); + await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111, useApproximateMaxLength: true); + } + + [Fact] + public async Task StreamTrimByMinIdAsyncWithApproximateAndLimit() + { + await prefixed.StreamTrimByMinIdAsync("key", 1111111111, useApproximateMaxLength: true, limit: 100); + await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111, useApproximateMaxLength: true, limit: 100); + } + [Fact] public async Task StringAppendAsync() { diff --git a/tests/StackExchange.Redis.Tests/StreamTests.cs b/tests/StackExchange.Redis.Tests/StreamTests.cs index 0ea744848..c9f8955e7 100644 --- a/tests/StackExchange.Redis.Tests/StreamTests.cs +++ b/tests/StackExchange.Redis.Tests/StreamTests.cs @@ -1916,6 +1916,49 @@ public void StreamTrimLength() Assert.Equal(1, len); } + [Fact] + public void StreamTrimByMinId() + { + using var conn = Create(require: RedisFeatures.v5_0_0); + + var db = conn.GetDatabase(); + var key = Me(); + + // Add a couple items and check length. + db.StreamAdd(key, "field1", "value1", 1111111110); + db.StreamAdd(key, "field2", "value2", 1111111111); + db.StreamAdd(key, "field3", "value3", 1111111112); + + var numRemoved = db.StreamTrimByMinId(key, 1111111111); + var len = db.StreamLength(key); + + Assert.Equal(1, numRemoved); + Assert.Equal(2, len); + } + + [Fact] + public void StreamTrimByMinIdWithApproximateAndLimit() + { + using var conn = Create(require: RedisFeatures.v5_0_0); + + var db = conn.GetDatabase(); + var key = Me(); + + const int maxLength = 1000; + const int limit = 100; + + for (var i = 0; i < maxLength; i++) + { + db.StreamAdd(key, $"field", $"value", 1111111110 + i); + } + + var numRemoved = db.StreamTrimByMinId(key, 1111111110 + maxLength, useApproximateMaxLength: true, limit: limit); + var len = db.StreamLength(key); + + Assert.Equal(limit, numRemoved); + Assert.Equal(maxLength - limit, len); + } + [Fact] public void StreamVerifyLength() {