Skip to content

Commit ff13721

Browse files
committed
Merge with 2.5.0 release
2 parents 750a9db + 9ec86b3 commit ff13721

File tree

14 files changed

+85
-54
lines changed

14 files changed

+85
-54
lines changed

CHANGELOG.md

+9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,15 @@
11
Release Notes
22
====
33

4+
# 05-04-2024
5+
<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.5.0">DotNext.Net.Cluster 5.5.0</a>
6+
* Introduced `IRaftCluster.WaitForLeadershipAsync` method that waits for the local node to be elected as a leader of the cluster
7+
* Fixed [233](https://github.com/dotnet/dotNext/issues/233)
8+
* Fixed correctness of appending no-op entry by a leader used as a write barrier
9+
10+
<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.5.0">DotNext.AspNetCore.Cluster 5.5.0</a>
11+
* Updated dependencies
12+
413
# 04-20-2024
514
<a href="https://www.nuget.org/packages/dotnext.io/5.4.0">DotNext.IO 5.4.0</a>
615
* Added `FileWriter.WrittenBuffer` property

README.md

+6-8
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,14 @@ All these things are implemented in 100% managed code on top of existing .NET AP
4444
* [NuGet Packages](https://www.nuget.org/profiles/rvsakno)
4545

4646
# What's new
47-
Release Date: 04-20-2024
47+
Release Date: 05-04-2024
4848

49-
<a href="https://www.nuget.org/packages/dotnext.io/5.4.0">DotNext.IO 5.4.0</a>
50-
* Added `FileWriter.WrittenBuffer` property
49+
<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.5.0">DotNext.Net.Cluster 5.5.0</a>
50+
* Introduced `IRaftCluster.WaitForLeadershipAsync` method that waits for the local node to be elected as a leader of the cluster
51+
* Fixed [233](https://github.com/dotnet/dotNext/issues/233)
52+
* Fixed correctness of appending no-op entry by a leader used as a write barrier
5153

52-
<a href="https://www.nuget.org/packages/dotnext.net.cluster/5.4.0">DotNext.Net.Cluster 5.4.0</a>
53-
* Changed binary file format for WAL for more efficient I/O. A new format is incompatible with all previous versions. To enable legacy format, set `PersistentState.Options.UseLegacyBinaryFormat` property to **true**
54-
* Introduced a new experimental binary format for WAL based on sparse files. Can be enabled with `PersistentState.Options.MaxLogEntrySize` property
55-
56-
<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.4.0">DotNext.AspNetCore.Cluster 5.4.0</a>
54+
<a href="https://www.nuget.org/packages/dotnext.aspnetcore.cluster/5.5.0">DotNext.AspNetCore.Cluster 5.5.0</a>
5755
* Updated dependencies
5856

5957
Changelog for previous versions located [here](./CHANGELOG.md).

src/Directory.Packages.props

+5-5
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
</ItemGroup>
2020
<ItemGroup>
2121
<!--Microsoft packages-->
22-
<PackageVersion Include="Microsoft.AspNetCore.Connections.Abstractions" Version="8.0.2" />
23-
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.0" />
22+
<PackageVersion Include="Microsoft.AspNetCore.Connections.Abstractions" Version="8.0.4" />
23+
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
2424
<PackageVersion Include="Microsoft.Extensions.Hosting.Abstractions" Version="8.0.0" />
2525
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
2626
<PackageVersion Include="Microsoft.Extensions.Http" Version="8.0.0" />
@@ -33,8 +33,8 @@
3333
<!--Misc packages-->
3434
<PackageVersion Include="BenchmarkDotNet" Version="0.13.12" />
3535
<PackageVersion Include="FastMember.Signed" Version="1.5.0" />
36-
<PackageVersion Include="xunit" Version="2.7.0" />
37-
<PackageVersion Include="xunit.runner.visualstudio" Version="2.5.7"/>
38-
<PackageVersion Include="coverlet.collector" Version="6.0.0"/>
36+
<PackageVersion Include="xunit" Version="2.8.0" />
37+
<PackageVersion Include="xunit.runner.visualstudio" Version="2.8.0"/>
38+
<PackageVersion Include="coverlet.collector" Version="6.0.2"/>
3939
</ItemGroup>
4040
</Project>

src/DotNext.Tests/Net/Cluster/Consensus/Raft/Http/RaftHttpClusterTests.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -803,7 +803,7 @@ public static async Task ConsensusToken()
803803
True(GetLocalClusterView(host2).ConsensusToken.IsCancellationRequested);
804804
await host2.StartAsync();
805805

806-
await GetLocalClusterView(host1).WaitForLeaderAsync(DefaultTimeout);
806+
await GetLocalClusterView(host1).WaitForLeadershipAsync(DefaultTimeout);
807807
Equal(GetLocalClusterView(host1).LeadershipToken, GetLocalClusterView(host1).ConsensusToken);
808808
True(await GetLocalClusterView(host1).AddMemberAsync(GetLocalClusterView(host2).LocalMemberAddress));
809809

src/cluster/DotNext.AspNetCore.Cluster/DotNext.AspNetCore.Cluster.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<ImplicitUsings>true</ImplicitUsings>
99
<IsTrimmable>true</IsTrimmable>
1010
<Features>nullablePublicOnly</Features>
11-
<VersionPrefix>5.4.0</VersionPrefix>
11+
<VersionPrefix>5.5.0</VersionPrefix>
1212
<VersionSuffix></VersionSuffix>
1313
<Authors>.NET Foundation and Contributors</Authors>
1414
<Product>.NEXT Family of Libraries</Product>

src/cluster/DotNext.Net.Cluster/DotNext.Net.Cluster.csproj

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<Nullable>enable</Nullable>
99
<IsTrimmable>true</IsTrimmable>
1010
<Features>nullablePublicOnly</Features>
11-
<VersionPrefix>5.4.0</VersionPrefix>
11+
<VersionPrefix>5.5.0</VersionPrefix>
1212
<VersionSuffix></VersionSuffix>
1313
<Authors>.NET Foundation and Contributors</Authors>
1414
<Product>.NEXT Family of Libraries</Product>

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CustomTransport/GenericClient.cs

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ private protected override async ValueTask<IConnectionContext> ConnectAsync(Canc
8181
{
8282
// connection has separated timeout
8383
using var connectDurationTracker = CancellationTokenSource.CreateLinkedTokenSource(token);
84+
connectDurationTracker.CancelAfter(ConnectTimeout);
8485
return new GenericConnectionContext(await factory.ConnectAsync(EndPoint, connectDurationTracker.Token).ConfigureAwait(false), defaultAllocator);
8586
}
8687
}

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/CustomTransport/GenericServer.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,12 @@ private async void HandleConnection(ConnectionContext connection, int transmissi
7474
// reset by client
7575
logger.ConnectionWasResetByClient(clientAddress);
7676
}
77-
catch (OperationCanceledException) when (tokenSource is not null && tokenSource.CancellationOrigin == connection.ConnectionClosed)
77+
catch (OperationCanceledException) when (tokenSource?.CancellationOrigin == connection.ConnectionClosed)
7878
{
7979
// closed by client
8080
logger.ConnectionWasResetByClient(clientAddress);
8181
}
82-
catch (OperationCanceledException) when (tokenSource is not null && tokenSource.CancellationOrigin == lifecycleToken)
82+
catch (OperationCanceledException) when (tokenSource?.CancellationOrigin == lifecycleToken)
8383
{
8484
// server stopped, suppress exception
8585
}

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/IRaftCluster.cs

+13-1
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public interface IRaftCluster : IReplicationCluster<IRaftLogEntry>, IPeerMesh<IR
5353
/// has communication with the leader.
5454
/// </summary>
5555
/// <remarks>
56-
/// The token moves to canceled state if the current node upgrades to the candidate state or looses connection with the leader.
56+
/// The token moves to canceled state if the current node upgrades to the candidate state or loses connection with the leader.
5757
/// </remarks>
5858
CancellationToken ConsensusToken { get; }
5959

@@ -72,4 +72,16 @@ public interface IRaftCluster : IReplicationCluster<IRaftLogEntry>, IPeerMesh<IR
7272
/// <returns>The task representing asynchronous result.</returns>
7373
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
7474
ValueTask ApplyReadBarrierAsync(CancellationToken token = default);
75+
76+
/// <summary>
77+
/// Waits until the local node is elected as the leader.
78+
/// </summary>
79+
/// <param name="timeout">The time to wait; or <see cref="Timeout.InfiniteTimeSpan"/>.</param>
80+
/// <param name="token">The token that can be used to cancel the operation.</param>
81+
/// <returns>The leadership token.</returns>
82+
/// <exception cref="TimeoutException">The operation is timed out.</exception>
83+
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
84+
/// <exception cref="ObjectDisposedException">The local node is disposed.</exception>
85+
/// <seealso cref="LeadershipToken"/>
86+
ValueTask<CancellationToken> WaitForLeadershipAsync(TimeSpan timeout, CancellationToken token = default);
7587
}

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.Replication.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ internal ValueTask<Result<bool>> ReplicateAsync(IAuditTrail<IRaftLogEntry> audit
111111
var startIndex = replicationIndex + 1L;
112112
Debug.Assert(startIndex == Member.State.NextIndex);
113113

114-
logger.ReplicationStarted(Member.EndPoint, startIndex);
114+
logger.ReplicationStarted(Member.EndPoint, startIndex, currentIndex);
115115
return auditTrail.ReadAsync(this, startIndex, currentIndex, token);
116116
}
117117

@@ -168,7 +168,7 @@ private ConfiguredTaskAwaitable<Result<HeartbeatResult>>.ConfiguredTaskAwaiter R
168168
{
169169
Debug.Assert(snapshot.IsSnapshot);
170170

171-
logger.InstallingSnapshot(replicationIndex = snapshotIndex);
171+
logger.InstallingSnapshot(Member.EndPoint, replicationIndex = snapshotIndex);
172172
var result = Member.InstallSnapshotAsync(term, snapshot, snapshotIndex, token).ConfigureAwait(false).GetAwaiter();
173173
fingerprint = Member.State.ConfigurationFingerprint; // keep local version unchanged
174174
return result;

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LeaderState.cs

+19-16
Original file line numberDiff line numberDiff line change
@@ -163,30 +163,33 @@ private async Task DoHeartbeats(TimeSpan period, IAuditTrail<IRaftLogEntry> audi
163163
goto case MemberResponse.Canceled;
164164
case MemberResponse.Canceled:
165165
return;
166-
}
167-
168-
if (++quorum == majority)
169-
{
170-
RenewLease(startTime.Elapsed);
171-
UpdateLeaderStickiness();
172-
await configurationStorage.ApplyAsync(Token).ConfigureAwait(false);
173-
}
174-
175-
if (result.Value && ++commitQuorum == majority)
176-
{
177-
// majority of nodes accept entries with at least one entry from the current term
178-
var count = await auditTrail.CommitAsync(currentIndex, Token).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end
179-
Logger.CommitSuccessful(currentIndex, count);
166+
case MemberResponse.Successful when ++quorum == majority:
167+
RenewLease(startTime.Elapsed);
168+
UpdateLeaderStickiness();
169+
goto default;
170+
default:
171+
commitQuorum += Unsafe.BitCast<bool, byte>(result.Value);
172+
continue;
180173
}
181174
}
182175
}
183176

184-
if (commitQuorum < majority)
177+
if (commitQuorum >= majority)
178+
{
179+
// majority of nodes accept entries with at least one entry from the current term
180+
var count = await auditTrail.CommitAsync(currentIndex, Token).ConfigureAwait(false); // commit all entries starting from the first uncommitted index to the end
181+
Logger.CommitSuccessful(currentIndex, count);
182+
}
183+
else
185184
{
186185
Logger.CommitFailed(quorum, commitIndex);
187186
}
188187

189-
if (quorum < majority)
188+
if (quorum >= majority)
189+
{
190+
await configurationStorage.ApplyAsync(Token).ConfigureAwait(false);
191+
}
192+
else
190193
{
191194
MoveToFollowerState(randomizeTimeout: false);
192195
return;

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/LogMessages.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,10 @@ internal static partial class LogMessages
110110
[LoggerMessage(
111111
EventIdOffset + 12,
112112
LogLevel.Debug,
113-
"Replication of member {Member} started at log index {EntryIndex}",
113+
"Replication of member {Member} started from {StartIndex} log index to {EndIndex}",
114114
EventName = $"{EventIdPrefix}.{nameof(ReplicationStarted)}"
115115
)]
116-
public static partial void ReplicationStarted(this ILogger logger, EndPoint member, long entryIndex);
116+
public static partial void ReplicationStarted(this ILogger logger, EndPoint member, long startIndex, long endIndex);
117117

118118
[LoggerMessage(
119119
EventIdOffset + 13,
@@ -158,10 +158,10 @@ internal static partial class LogMessages
158158
[LoggerMessage(
159159
EventIdOffset + 18,
160160
LogLevel.Information,
161-
"Installing snapshot at index {EntryIndex}",
161+
"Installing snapshot at index {EntryIndex} for member {Member}",
162162
EventName = $"{EventIdPrefix}.{nameof(InstallingSnapshot)}"
163163
)]
164-
public static partial void InstallingSnapshot(this ILogger logger, long entryIndex);
164+
public static partial void InstallingSnapshot(this ILogger logger, EndPoint member, long entryIndex);
165165

166166
public const string SnapshotInstallationFailed = "Critical error detected while installing snapshot of audit trail";
167167

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/RaftCluster.cs

+20-12
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public abstract partial class RaftCluster<TMember> : Disposable, IUnresponsiveCl
3535

3636
private volatile RaftState<TMember> state;
3737
private volatile TaskCompletionSource<TMember> electionEvent;
38+
private volatile TaskCompletionSource<CancellationToken> leadershipEvent;
3839
private InvocationList<Action<RaftCluster<TMember>, TMember?>> leaderChangedHandlers;
3940
private InvocationList<Action<RaftCluster<TMember>, TMember>> replicationHandlers;
4041
private volatile int electionTimeout;
@@ -75,6 +76,7 @@ protected RaftCluster(IClusterMemberConfiguration config, in TagList measurement
7576
readinessProbe = new(TaskCreationOptions.RunContinuationsAsynchronously);
7677
aggressiveStickiness = config.AggressiveLeaderStickiness;
7778
electionEvent = new(TaskCreationOptions.RunContinuationsAsynchronously);
79+
leadershipEvent = new(TaskCreationOptions.RunContinuationsAsynchronously);
7880
state = new StandbyState<TMember>(this, TimeSpan.FromMilliseconds(electionTimeout));
7981
EndPointComparer = config.EndPointComparer;
8082
this.measurementTags = measurementTags;
@@ -256,22 +258,18 @@ private set
256258
}
257259
}
258260

259-
/// <summary>
260-
/// Waits for the leader election asynchronously.
261-
/// </summary>
262-
/// <param name="timeout">The time to wait; or <see cref="System.Threading.Timeout.InfiniteTimeSpan"/>.</param>
263-
/// <param name="token">The token that can be used to cancel the operation.</param>
264-
/// <returns>The elected leader.</returns>
265-
/// <exception cref="TimeoutException">The operation is timed out.</exception>
266-
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
267-
/// <exception cref="ObjectDisposedException">The local node is disposed.</exception>
261+
/// <inheritdoc cref="ICluster.WaitForLeaderAsync(TimeSpan, CancellationToken)"/>
268262
public Task<TMember> WaitForLeaderAsync(TimeSpan timeout, CancellationToken token = default)
269263
=> electionEvent.Task.WaitAsync(timeout, token);
270264

271265
/// <inheritdoc />
272266
ValueTask<IClusterMember> ICluster.WaitForLeaderAsync(TimeSpan timeout, CancellationToken token)
273267
=> new(WaitForLeaderAsync(timeout, token).Convert<TMember, IClusterMember>());
274268

269+
/// <inheritdoc cref="IRaftCluster.WaitForLeadershipAsync(TimeSpan, CancellationToken)"/>
270+
public ValueTask<CancellationToken> WaitForLeadershipAsync(TimeSpan timeout, CancellationToken token = default)
271+
=> new(leadershipEvent.Task.WaitAsync(timeout, token));
272+
275273
private ValueTask UnfreezeAsync()
276274
{
277275
ValueTask result;
@@ -441,7 +439,14 @@ private async Task CancelPendingRequestsAsync()
441439
}
442440

443441
private ValueTask UpdateStateAsync(RaftState<TMember> newState)
444-
=> Interlocked.Exchange(ref state, newState).DisposeAsync();
442+
{
443+
if (leadershipEvent is { Task.IsCompletedSuccessfully: true } leadershipEventCopy)
444+
{
445+
Interlocked.CompareExchange(ref leadershipEvent, new(TaskCreationOptions.RunContinuationsAsynchronously), leadershipEventCopy);
446+
}
447+
448+
return Interlocked.Exchange(ref state, newState).DisposeAsync();
449+
}
445450

446451
/// <summary>
447452
/// Stops serving local member.
@@ -1118,11 +1123,13 @@ async void IRaftStateMachine<TMember>.MoveToLeaderState(IRaftStateMachine.IWeakC
11181123
};
11191124

11201125
await UpdateStateAsync(newState).ConfigureAwait(false);
1126+
await auditTrail.AppendNoOpEntry(LifecycleToken).ConfigureAwait(false);
11211127

1128+
// ensure that the leader is visible to the consumers after no-op entry is added to the log (which acts as a write barrier)
11221129
Leader = newLeader;
1123-
await auditTrail.AppendNoOpEntry(LifecycleToken).ConfigureAwait(false);
1124-
newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage);
1130+
leadershipEvent.TrySetResult(newState.Token);
11251131

1132+
newState.StartLeading(HeartbeatTimeout, auditTrail, ConfigurationStorage);
11261133
Logger.TransitionToLeaderStateCompleted(currentTerm);
11271134
}
11281135
}
@@ -1293,6 +1300,7 @@ protected override void Dispose(bool disposing)
12931300
memberAddedHandlers = memberRemovedHandlers = default;
12941301
leaderChangedHandlers = default;
12951302
TrySetDisposedException(electionEvent);
1303+
TrySetDisposedException(leadershipEvent);
12961304
}
12971305

12981306
base.Dispose(disposing);

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/Tcp/TcpClient.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private protected override async ValueTask<IConnectionContext> ConnectAsync(Canc
121121
try
122122
{
123123
connectDurationTracker.CancelAfter(ConnectTimeout);
124-
await socket.ConnectAsync(EndPoint, token).ConfigureAwait(false);
124+
await socket.ConnectAsync(EndPoint, connectDurationTracker.Token).ConfigureAwait(false);
125125
}
126126
catch
127127
{

0 commit comments

Comments
 (0)