Skip to content

Commit 452e3b1

Browse files
committed
feat: add circuit breaker for downgrade
1 parent 9d9229b commit 452e3b1

File tree

3 files changed

+140
-16
lines changed

3 files changed

+140
-16
lines changed

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,27 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
88
public class EventBusOptions
99
{
1010
/// <summary>
11-
/// The service collection for
11+
/// Interval for publish integration event. Defaults to 1000ms.
1212
/// </summary>
13-
public IServiceCollection? Services { get; set; }
13+
public int Interval { get; set; } = 1000;
1414

1515
/// <summary>
16-
/// Interval for publish integration event.
16+
/// Maximum number of events that can be sent in one cycle. Pass <c>null</c> to disable limit. Defaults to <c>null</c>.
1717
/// </summary>
18-
public int Interval { get; set; } = 1;
18+
public int? MaximumBatchSize { get; set; }
19+
20+
/// <summary>
21+
/// The maximum number of failure before downgrade. Defaults to 5.
22+
/// </summary>
23+
public int FailureCountBeforeDowngrade { get; set; } = 5;
24+
25+
/// <summary>
26+
/// Interval when downgraded. Defaults to 60000ms(1min).
27+
/// </summary>
28+
public int DowngradeInterval { get; set; } = 60 * 1000;
29+
30+
/// <summary>
31+
/// The maximum number of success before recover. Defaults to 1.
32+
/// </summary>
33+
public int SuccessCountBeforeRecover { get; set; } = 1;
1934
}

src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs

Lines changed: 42 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,53 +38,83 @@ public PublishIntegrationEventHostedService(
3838
/// <inheritdoc />
3939
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4040
{
41-
_logger.LogInformation("Integration event publisher running.");
41+
_logger.LogInformation("Integration event publisher running");
4242
var watch = new Stopwatch();
43-
using var timer = new PeriodicTimer(TimeSpan.FromMicroseconds(_options.Interval));
44-
while (await timer.WaitForNextTickAsync(stoppingToken))
43+
var failureCounter = 0;
44+
var successCounter = 0;
45+
using var normalTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.Interval));
46+
using var failedTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.DowngradeInterval));
47+
var currentTimer = normalTimer;
48+
var downgraded = false;
49+
while (await currentTimer.WaitForNextTickAsync(stoppingToken))
4550
{
4651
try
4752
{
4853
watch.Restart();
49-
var beforeCount = _eventBuffer.Count;
50-
await PublishEventAsync();
54+
var sent = await PublishEventAsync();
5155
watch.Stop();
5256
var afterCount = _eventBuffer.Count;
53-
if (afterCount - beforeCount > 0)
57+
if (sent > 0)
5458
{
59+
successCounter++;
5560
_logger.LogInformation(
5661
"Published {PublishedEventCount} events in {Duration} ms, resting count: {RestingEventCount}",
57-
beforeCount - afterCount,
62+
sent,
5863
watch.ElapsedMilliseconds,
5964
afterCount);
6065
}
6166
}
6267
catch (Exception e)
6368
{
64-
_logger.LogError(e, "Publish integration event failed, pending count: {Count}", _eventBuffer.Count);
69+
failureCounter++;
70+
_logger.LogWarning(
71+
e,
72+
"Publish integration event failed, pending count: {Count}, failure count: {FailureCount}",
73+
_eventBuffer.Count,
74+
failureCounter);
75+
}
76+
77+
if (downgraded == false && failureCounter >= _options.FailureCountBeforeDowngrade)
78+
{
79+
_logger.LogError("Integration event publisher downgraded");
80+
downgraded = true;
81+
currentTimer = failedTimer;
82+
successCounter = 0;
83+
}
84+
85+
if (downgraded && successCounter > _options.SuccessCountBeforeRecover)
86+
{
87+
downgraded = false;
88+
currentTimer = normalTimer;
89+
failureCounter = 0;
90+
_logger.LogWarning("Integration event publisher recovered from downgrade");
6591
}
6692
}
6793
}
6894

69-
private async Task PublishEventAsync()
95+
private async Task<int> PublishEventAsync()
7096
{
7197
if (_eventBuffer.Count == 0)
7298
{
73-
return;
99+
return 0;
74100
}
75101

76102
using var scope = _serviceProvider.CreateScope();
77103
var provider = scope.ServiceProvider.GetRequiredService<IEventBusProvider>();
78-
while (_eventBuffer.Count > 0)
104+
var publishedEventCount = 0;
105+
while (_eventBuffer.Count > 0 && publishedEventCount != _options.MaximumBatchSize)
79106
{
80107
var buffered = _eventBuffer.Peek();
81108
if (buffered is null)
82109
{
83-
return;
110+
break;
84111
}
85112

86113
await provider.PublishAsync(buffered.Name, buffered.Event);
87114
_eventBuffer.Pop();
115+
publishedEventCount++;
88116
}
117+
118+
return publishedEventCount;
89119
}
90120
}

test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,83 @@ public async Task EventBus_PublishEvent_SuccessAsync()
4242
x => x.PublishAsync(It.IsAny<string>(), It.Is<TestIntegrationEvent>(t => t.Message == data)),
4343
Times.Once);
4444
}
45+
46+
[Fact]
47+
public async Task EventBus_Downgrading_DowngradeAsync()
48+
{
49+
// Arrange
50+
const string data = "hello";
51+
var builder = new WebApplicationFactory<Program>();
52+
var eventBusMock = new Mock<IEventBusProvider>();
53+
builder = builder.WithWebHostBuilder(
54+
b => b.ConfigureServices(
55+
services =>
56+
{
57+
services.RemoveAll<IEventBusProvider>();
58+
services.AddScoped<IEventBusProvider>(_ => eventBusMock.Object);
59+
services.Configure<EventBusOptions>(
60+
o =>
61+
{
62+
o.FailureCountBeforeDowngrade = 1;
63+
o.DowngradeInterval = 3000;
64+
});
65+
}));
66+
eventBusMock.Setup(x => x.PublishAsync(It.IsAny<string>(), It.IsAny<IntegrationEvent>()))
67+
.ThrowsAsync(new InvalidOperationException());
68+
69+
// Act
70+
var response = await builder.CreateClient().PostAsJsonAsync(
71+
"/api/v1/strings",
72+
new CreatePayload(false, data));
73+
var content = await response.Content.ReadAsStringAsync();
74+
await Task.Delay(3000); // hit at 1000ms and 3000ms
75+
76+
// Assert
77+
response.Should().BeSuccessful();
78+
content.Should().BeNullOrEmpty();
79+
eventBusMock.Verify(
80+
x => x.PublishAsync(It.IsAny<string>(), It.Is<TestIntegrationEvent>(t => t.Message == data)),
81+
Times.Exactly(2));
82+
}
83+
84+
[Fact]
85+
public async Task EventBus_DowngradeThenRecover_RecoverAsync()
86+
{
87+
// Arrange
88+
const string data = "hello";
89+
var builder = new WebApplicationFactory<Program>();
90+
var eventBusMock = new Mock<IEventBusProvider>();
91+
builder = builder.WithWebHostBuilder(
92+
b => b.ConfigureServices(
93+
services =>
94+
{
95+
services.RemoveAll<IEventBusProvider>();
96+
services.AddScoped<IEventBusProvider>(_ => eventBusMock.Object);
97+
services.Configure<EventBusOptions>(
98+
o =>
99+
{
100+
o.FailureCountBeforeDowngrade = 1;
101+
o.DowngradeInterval = 4000;
102+
});
103+
}));
104+
eventBusMock.Setup(x => x.PublishAsync(It.IsAny<string>(), It.IsAny<IntegrationEvent>()))
105+
.ThrowsAsync(new InvalidOperationException());
106+
await builder.CreateClient().PostAsJsonAsync(
107+
"/api/v1/strings",
108+
new CreatePayload(false, data));
109+
await Task.Delay(1000); // failed, now it is downgraded
110+
111+
// Act
112+
eventBusMock.Reset();
113+
await Task.Delay(2000); // recover
114+
await builder.CreateClient().PostAsJsonAsync(
115+
"/api/v1/strings",
116+
new CreatePayload(false, data));
117+
await Task.Delay(1000);
118+
119+
// Assert
120+
eventBusMock.Verify(
121+
x => x.PublishAsync(It.IsAny<string>(), It.Is<TestIntegrationEvent>(t => t.Message == data)),
122+
Times.Exactly(2));
123+
}
45124
}

0 commit comments

Comments
 (0)