Skip to content

Add fail action async #428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions RabbitMQ.Stream.Client/ICrc32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,20 @@

namespace RabbitMQ.Stream.Client
{
public struct ChunkInfo
{
/// <summary>
/// The stream name of the chunk.
/// </summary>
public string StreamName { get; init; }

public ulong Id { get; init; }

public uint ServerHash { get; init; }

public uint LocalHash { get; init; }
}

public enum ChunkAction
{
/// <summary>
Expand Down Expand Up @@ -37,5 +51,7 @@ public interface ICrc32
/// The code here should be safe
/// </summary>
Func<IConsumer, ChunkAction> FailAction { get; init; }

Func<IConsumer, ChunkInfo, ChunkAction> AsyncFailAction { get; init; }
}
}
14 changes: 14 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.ChunkAction
RabbitMQ.Stream.Client.ChunkAction.Skip = 1 -> RabbitMQ.Stream.Client.ChunkAction
RabbitMQ.Stream.Client.ChunkAction.TryToProcess = 0 -> RabbitMQ.Stream.Client.ChunkAction
RabbitMQ.Stream.Client.ChunkInfo
RabbitMQ.Stream.Client.ChunkInfo.ChunkInfo() -> void
RabbitMQ.Stream.Client.ChunkInfo.Id.get -> ulong
RabbitMQ.Stream.Client.ChunkInfo.Id.init -> void
RabbitMQ.Stream.Client.ChunkInfo.LocalHash.get -> uint
RabbitMQ.Stream.Client.ChunkInfo.LocalHash.init -> void
RabbitMQ.Stream.Client.ChunkInfo.ServerHash.get -> uint
RabbitMQ.Stream.Client.ChunkInfo.ServerHash.init -> void
RabbitMQ.Stream.Client.ChunkInfo.StreamName.get -> string
RabbitMQ.Stream.Client.ChunkInfo.StreamName.init -> void
RabbitMQ.Stream.Client.Client.ClientId.get -> string
RabbitMQ.Stream.Client.Client.ClientId.init -> void
RabbitMQ.Stream.Client.Client.Consumers.get -> System.Collections.Generic.IDictionary<byte, (string, RabbitMQ.Stream.Client.ConsumerEvents)>
Expand Down Expand Up @@ -186,6 +196,8 @@ RabbitMQ.Stream.Client.IConsumerConfig.FlowControl.set -> void
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.ICrc32.AsyncFailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkInfo, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.ICrc32.AsyncFailAction.init -> void
RabbitMQ.Stream.Client.ICrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.ICrc32.FailAction.init -> void
RabbitMQ.Stream.Client.ICrc32.Hash(byte[] data) -> byte[]
Expand Down Expand Up @@ -339,6 +351,8 @@ RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RoutingStrategyType.Hash = 0 -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RoutingStrategyType.Key = 1 -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.StreamCrc32
RabbitMQ.Stream.Client.StreamCrc32.AsyncFailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkInfo, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.StreamCrc32.AsyncFailAction.init -> void
RabbitMQ.Stream.Client.StreamCrc32.FailAction.get -> System.Func<RabbitMQ.Stream.Client.IConsumer, RabbitMQ.Stream.Client.ChunkAction>
RabbitMQ.Stream.Client.StreamCrc32.FailAction.init -> void
RabbitMQ.Stream.Client.StreamCrc32.Hash(byte[] data) -> byte[]
Expand Down
10 changes: 8 additions & 2 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ internal void Validate()
}

FlowControl ??= new FlowControl();

}

internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 };
Expand Down Expand Up @@ -642,7 +641,14 @@ private async Task Init()
// if the user has set the FailAction, we call it
// to allow the user to handle the chunk action
// if the FailAction is not set, we skip the chunk
chunkAction = _config.Crc32.FailAction?.Invoke(this) ?? ChunkAction.Skip;
chunkAction = _config.Crc32.AsyncFailAction?.Invoke(this,
new ChunkInfo()
{
Id = deliver.Chunk.ChunkId,
ServerHash = deliver.Chunk.Crc,
LocalHash = crcCalculated,
StreamName = _config.Stream
}) ?? ChunkAction.Skip;
}
}

Expand Down
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/StreamCrc32.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ public byte[] Hash(byte[] data)
}

public Func<IConsumer, ChunkAction> FailAction { get; init; } = null;
public Func<IConsumer, ChunkInfo, ChunkAction> AsyncFailAction { get; init; }
}
1 change: 1 addition & 0 deletions Tests/Crc32Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public byte[] Hash(byte[] data)
}

public Func<IConsumer, ChunkAction> FailAction { get; init; }
public Func<IConsumer, ChunkInfo, ChunkAction> AsyncFailAction { get; init; }
}

public class Crc32Tests(ITestOutputHelper testOutputHelper)
Expand Down