Skip to content

Commit 5474bc9

Browse files
authored
Merge pull request #137 from cnblogs/add-maximum-buffer-size
feat: add maximum buffer size option
2 parents 0b392ca + ec823b1 commit 5474bc9

File tree

5 files changed

+108
-0
lines changed

5 files changed

+108
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
2+
3+
/// <summary>
4+
/// The exception that is thrown when <see cref="IEventBuffer"/> reaches its maximum capacity configured in <see cref="EventBusOptions.MaximumBufferSize"/>.
5+
/// </summary>
6+
public sealed class EventBufferOverflowException : Exception
7+
{
8+
/// <summary>
9+
/// Creates an <see cref="EventBufferOverflowException"/>.
10+
/// </summary>
11+
public EventBufferOverflowException()
12+
: base("Event buffer reached its maximum capacity")
13+
{
14+
}
15+
}

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ public class EventBusOptions
1717
/// </summary>
1818
public int? MaximumBatchSize { get; set; }
1919

20+
/// <summary>
21+
/// Maximum number of events that can be stored in buffer. An <see cref="EventBufferOverflowException"/> would be thrown when the number of events in buffer exceeds this limit. Pass <c>null</c> to disable limit. Defaults to <c>null</c>.
22+
/// </summary>
23+
public int? MaximumBufferSize { get; set; }
24+
2025
/// <summary>
2126
/// The maximum number of failure before downgrade. Defaults to 5.
2227
/// </summary>

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/IEventBuffer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ public interface IEventBuffer
1616
/// <param name="name">The name of integration event.</param>
1717
/// <param name="event">The event.</param>
1818
/// <typeparam name="TEvent">The type of integration event.</typeparam>
19+
/// <exception cref="EventBufferOverflowException">Throws when the number of events in buffer exceeds <see cref="EventBusOptions.MaximumBufferSize"/>.</exception>
1920
void Add<TEvent>(string name, TEvent @event)
2021
where TEvent : IntegrationEvent;
2122

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/InMemoryEventBuffer.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using Microsoft.Extensions.Options;
23

34
namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
45

@@ -8,6 +9,16 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
89
public class InMemoryEventBuffer : IEventBuffer
910
{
1011
private readonly ConcurrentQueue<BufferedIntegrationEvent> _queue = new();
12+
private readonly EventBusOptions _options;
13+
14+
/// <summary>
15+
/// Creates an <see cref="InMemoryEventBuffer"/>.
16+
/// </summary>
17+
/// <param name="options">The Eventbus options.</param>
18+
public InMemoryEventBuffer(IOptions<EventBusOptions> options)
19+
{
20+
_options = options.Value;
21+
}
1122

1223
/// <inheritdoc />
1324
public int Count => _queue.Count;
@@ -16,6 +27,11 @@ public class InMemoryEventBuffer : IEventBuffer
1627
public void Add<TEvent>(string name, TEvent @event)
1728
where TEvent : IntegrationEvent
1829
{
30+
if (_queue.Count >= _options.MaximumBufferSize)
31+
{
32+
throw new EventBufferOverflowException();
33+
}
34+
1935
_queue.Enqueue(new BufferedIntegrationEvent(name, @event));
2036
}
2137

test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
using System.Net;
12
using System.Net.Http.Json;
23
using Cnblogs.Architecture.Ddd.EventBus.Abstractions;
34
using Cnblogs.Architecture.IntegrationTestProject;
@@ -81,6 +82,76 @@ public async Task EventBus_Downgrading_DowngradeAsync()
8182
Times.Exactly(2));
8283
}
8384

85+
[Fact]
86+
public async Task EventBus_MaximumBufferSizeReached_ThrowAsync()
87+
{
88+
// Arrange
89+
const string data = "hello";
90+
var builder = new WebApplicationFactory<Program>();
91+
var eventBusMock = new Mock<IEventBusProvider>();
92+
builder = builder.WithWebHostBuilder(
93+
b => b.ConfigureServices(
94+
services =>
95+
{
96+
services.RemoveAll<IEventBusProvider>();
97+
services.AddScoped<IEventBusProvider>(_ => eventBusMock.Object);
98+
services.Configure<EventBusOptions>(
99+
o =>
100+
{
101+
o.MaximumBufferSize = 1;
102+
o.FailureCountBeforeDowngrade = 1;
103+
o.DowngradeInterval = 3000;
104+
});
105+
}));
106+
eventBusMock.Setup(x => x.PublishAsync(It.IsAny<string>(), It.IsAny<IntegrationEvent>()))
107+
.ThrowsAsync(new InvalidOperationException());
108+
var client = builder.CreateClient();
109+
await client.PostAsJsonAsync(
110+
"/api/v1/strings",
111+
new CreatePayload(false, data));
112+
113+
// Act
114+
var response = await client.PostAsJsonAsync("/api/v1/strings", new CreatePayload(false, data));
115+
116+
// Assert
117+
response.Should().HaveStatusCode(HttpStatusCode.InternalServerError);
118+
}
119+
120+
[Fact]
121+
public async Task EventBus_MaximumBatchSize_OneBatchAsync()
122+
{
123+
// Arrange
124+
const string data = "hello";
125+
var builder = new WebApplicationFactory<Program>();
126+
var eventBusMock = new Mock<IEventBusProvider>();
127+
builder = builder.WithWebHostBuilder(
128+
b => b.ConfigureServices(
129+
services =>
130+
{
131+
services.RemoveAll<IEventBusProvider>();
132+
services.AddScoped<IEventBusProvider>(_ => eventBusMock.Object);
133+
services.Configure<EventBusOptions>(
134+
o =>
135+
{
136+
o.MaximumBatchSize = 1;
137+
o.FailureCountBeforeDowngrade = 1;
138+
o.DowngradeInterval = 3000;
139+
});
140+
}));
141+
var client = builder.CreateClient();
142+
for (var i = 0; i < 3; i++)
143+
{
144+
// put 3 events
145+
await client.PostAsJsonAsync("/api/v1/strings", new CreatePayload(false, data));
146+
}
147+
148+
// Act
149+
await Task.Delay(1000);
150+
151+
// Assert
152+
eventBusMock.Verify(x => x.PublishAsync(It.IsAny<string>(), It.IsAny<IntegrationEvent>()), Times.Once);
153+
}
154+
84155
[Fact]
85156
public async Task EventBus_DowngradeThenRecover_RecoverAsync()
86157
{

0 commit comments

Comments
 (0)