Skip to content

add minId overloads #2842

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2775,6 +2775,18 @@ IEnumerable<SortedSetEntry> SortedSetScan(
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);

/// <summary>
/// Trim the stream to a specified minimum timestamp.
/// </summary>
/// <param name="key">The key of the stream.</param>
/// <param name="minId">All entries with an id (timestamp) earlier minId will be removed.</param>
/// <param name="useApproximateMaxLength">If true, the "~" argument is used to allow the stream to exceed minId by a small number. This improves performance when removing messages.</param>
/// <param name="limit">The maximum number of entries to remove per call when useApproximateMaxLength = true. If 0, the limiting mechanism is disabled entirely.</param>
/// <param name="flags">The flags to use for this operation.</param>
/// <returns>The number of messages removed from the stream.</returns>
/// <remarks><seealso href="https://redis.io/topics/streams-intro"/></remarks>
long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None);

/// <summary>
/// If key already exists and is a string, this command appends the value at the end of the string.
/// If key does not exist it is created and set as an empty string, so APPEND will be similar to SET in this special case.
Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/Interfaces/IDatabaseAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,9 @@ IAsyncEnumerable<SortedSetEntry> SortedSetScanAsync(
/// <inheritdoc cref="IDatabase.StreamTrim(RedisKey, int, bool, CommandFlags)"/>
Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StreamTrimByMinId(RedisKey, RedisValue, bool, int?, CommandFlags)"/>
Task<long> StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None);

/// <inheritdoc cref="IDatabase.StringAppend(RedisKey, RedisValue, CommandFlags)"/>
Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None);

Expand Down
3 changes: 3 additions & 0 deletions src/StackExchange.Redis/KeyspaceIsolation/KeyPrefixed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,9 @@ public Task<RedisStream[]> StreamReadGroupAsync(StreamPosition[] streamPositions
public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamTrimAsync(ToInner(key), maxLength, useApproximateMaxLength, flags);

public Task<long> StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamTrimByMinIdAsync(ToInner(key), minId, useApproximateMaxLength, limit, flags);

public Task<long> StringAppendAsync(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) =>
Inner.StringAppendAsync(ToInner(key), value, flags);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,9 @@ public RedisStream[] StreamReadGroup(StreamPosition[] streamPositions, RedisValu
public long StreamTrim(RedisKey key, int maxLength, bool useApproximateMaxLength = false, CommandFlags flags = CommandFlags.None) =>
Inner.StreamTrim(ToInner(key), maxLength, useApproximateMaxLength, flags);

public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None) =>
Inner.StreamTrimByMinId(ToInner(key), minId, useApproximateMaxLength, limit, flags);

public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None) =>
Inner.StringAppend(ToInner(key), value, flags);

Expand Down
4 changes: 3 additions & 1 deletion src/StackExchange.Redis/PublicAPI/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
#nullable enable
#nullable enable
StackExchange.Redis.IDatabase.StreamTrimByMinId(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> long
StackExchange.Redis.IDatabaseAsync.StreamTrimByMinIdAsync(StackExchange.Redis.RedisKey key, StackExchange.Redis.RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, StackExchange.Redis.CommandFlags flags = StackExchange.Redis.CommandFlags.None) -> System.Threading.Tasks.Task<long>!
46 changes: 46 additions & 0 deletions src/StackExchange.Redis/RedisDatabase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3007,6 +3007,18 @@ public Task<long> StreamTrimAsync(RedisKey key, int maxLength, bool useApproxima
return ExecuteAsync(msg, ResultProcessor.Int64);
}

public long StreamTrimByMinId(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamTrimByMinIdMessage(key, minId, useApproximateMaxLength, limit, flags);
return ExecuteSync(msg, ResultProcessor.Int64);
}

public Task<long> StreamTrimByMinIdAsync(RedisKey key, RedisValue minId, bool useApproximateMaxLength = false, int? limit = null, CommandFlags flags = CommandFlags.None)
{
var msg = GetStreamTrimByMinIdMessage(key, minId, useApproximateMaxLength, limit, flags);
return ExecuteAsync(msg, ResultProcessor.Int64);
}

public long StringAppend(RedisKey key, RedisValue value, CommandFlags flags = CommandFlags.None)
{
var msg = Message.Create(Database, flags, RedisCommand.APPEND, key, value);
Expand Down Expand Up @@ -4494,6 +4506,40 @@ private Message GetStreamTrimMessage(RedisKey key, int maxLength, bool useApprox
values);
}

private Message GetStreamTrimByMinIdMessage(RedisKey key, RedisValue minId, bool useApproximateMaxLength, int? limit, CommandFlags flags)
{
if (limit.HasValue && limit.Value <= 0)
{
throw new ArgumentOutOfRangeException(nameof(limit), "limit must be greater than 0 when specified.");
}

var values = new RedisValue[2 + (useApproximateMaxLength ? 1 : 0) + (useApproximateMaxLength && limit.HasValue ? 2 : 0)];

var offset = 0;

values[offset++] = StreamConstants.MinId;

if (useApproximateMaxLength)
{
values[offset++] = StreamConstants.ApproximateMaxLen;
}

values[offset++] = minId;

if (useApproximateMaxLength && limit.HasValue)
{
values[offset++] = RedisLiterals.LIMIT;
values[offset] = limit.Value;
}

return Message.Create(
Database,
flags,
RedisCommand.XTRIM,
key,
values);
}

private Message? GetStringBitOperationMessage(Bitwise operation, RedisKey destination, RedisKey[] keys, CommandFlags flags)
{
if (keys == null) throw new ArgumentNullException(nameof(keys));
Expand Down
1 change: 1 addition & 0 deletions src/StackExchange.Redis/StreamConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ internal static class StreamConstants
internal static readonly RedisValue SetId = "SETID";

internal static readonly RedisValue MaxLen = "MAXLEN";
internal static readonly RedisValue MinId = "MINID";

internal static readonly RedisValue MkStream = "MKSTREAM";

Expand Down
21 changes: 21 additions & 0 deletions tests/StackExchange.Redis.Tests/KeyPrefixedDatabaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,27 @@ public void StreamTrim()
mock.Received().StreamTrim("prefix:key", 1000, true, CommandFlags.None);
}

[Fact]
public void StreamTrimByMinId()
{
prefixed.StreamTrimByMinId("key", 1111111111);
mock.Received().StreamTrimByMinId("prefix:key", 1111111111);
}

[Fact]
public void StreamTrimByMinIdWithApproximate()
{
prefixed.StreamTrimByMinId("key", 1111111111, useApproximateMaxLength: true);
mock.Received().StreamTrimByMinId("prefix:key", 1111111111, useApproximateMaxLength: true);
}

[Fact]
public void StreamTrimByMinIdWithApproximateAndLimit()
{
prefixed.StreamTrimByMinId("key", 1111111111, useApproximateMaxLength: true, limit: 100);
mock.Received().StreamTrimByMinId("prefix:key", 1111111111, useApproximateMaxLength: true, limit: 100);
}

[Fact]
public void StringAppend()
{
Expand Down
21 changes: 21 additions & 0 deletions tests/StackExchange.Redis.Tests/KeyPrefixedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,6 +1118,27 @@ public async Task StreamTrimAsync()
await mock.Received().StreamTrimAsync("prefix:key", 1000, true, CommandFlags.None);
}

[Fact]
public async Task StreamTrimByMinIdAsync()
{
await prefixed.StreamTrimByMinIdAsync("key", 1111111111);
await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111);
}

[Fact]
public async Task StreamTrimByMinIdAsyncWithApproximate()
{
await prefixed.StreamTrimByMinIdAsync("key", 1111111111, useApproximateMaxLength: true);
await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111, useApproximateMaxLength: true);
}

[Fact]
public async Task StreamTrimByMinIdAsyncWithApproximateAndLimit()
{
await prefixed.StreamTrimByMinIdAsync("key", 1111111111, useApproximateMaxLength: true, limit: 100);
await mock.Received().StreamTrimByMinIdAsync("prefix:key", 1111111111, useApproximateMaxLength: true, limit: 100);
}

[Fact]
public async Task StringAppendAsync()
{
Expand Down
43 changes: 43 additions & 0 deletions tests/StackExchange.Redis.Tests/StreamTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1916,6 +1916,49 @@ public void StreamTrimLength()
Assert.Equal(1, len);
}

[Fact]
public void StreamTrimByMinId()
{
using var conn = Create(require: RedisFeatures.v5_0_0);

var db = conn.GetDatabase();
var key = Me();

// Add a couple items and check length.
db.StreamAdd(key, "field1", "value1", 1111111110);
db.StreamAdd(key, "field2", "value2", 1111111111);
db.StreamAdd(key, "field3", "value3", 1111111112);

var numRemoved = db.StreamTrimByMinId(key, 1111111111);
var len = db.StreamLength(key);

Assert.Equal(1, numRemoved);
Assert.Equal(2, len);
}

[Fact]
public void StreamTrimByMinIdWithApproximateAndLimit()
{
using var conn = Create(require: RedisFeatures.v5_0_0);

var db = conn.GetDatabase();
var key = Me();

const int maxLength = 1000;
const int limit = 100;

for (var i = 0; i < maxLength; i++)
{
db.StreamAdd(key, $"field", $"value", 1111111110 + i);
}

var numRemoved = db.StreamTrimByMinId(key, 1111111110 + maxLength, useApproximateMaxLength: true, limit: limit);
var len = db.StreamLength(key);

Assert.Equal(limit, numRemoved);
Assert.Equal(maxLength - limit, len);
}

[Fact]
public void StreamVerifyLength()
{
Expand Down
Loading