Skip to content

Commit d2e8fb8

Browse files
committed
Changed WAL binary format for better I/O
1 parent e9131e2 commit d2e8fb8

File tree

2 files changed

+186
-84
lines changed

2 files changed

+186
-84
lines changed

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Internal.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ internal static long GetTerm(ReadOnlySpan<byte> input)
151151
internal static long GetOffset(ReadOnlySpan<byte> input)
152152
=> BinaryPrimitives.ReadInt64LittleEndian(input.Slice(0, sizeof(long) + sizeof(long) + sizeof(long))); // skip Term, Timestamp, Length
153153

154-
private long End => Length + Offset;
154+
internal long End => Length + Offset;
155155

156156
internal static long GetEndOfLogEntry(ReadOnlySpan<byte> input)
157157
{

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Partition.cs

Lines changed: 185 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using System.Diagnostics;
1+
using System.Collections;
2+
using System.Diagnostics;
23
using System.Diagnostics.CodeAnalysis;
34
using System.Runtime.CompilerServices;
45
using System.Runtime.InteropServices;
@@ -242,12 +243,12 @@ protected override void Dispose(bool disposing)
242243
}
243244
}
244245

245-
private sealed class SparsePartition : Partition
246+
private sealed class SparsePartition : Partition, IReadOnlyList<ReadOnlyMemory<byte>>
246247
{
247248
private readonly long maxLogEntrySize;
248249
private MemoryOwner<CachedLogEntryMetadata> metadataTable;
249250
private MemoryOwner<byte> metadataBuffer;
250-
private ReadOnlyMemory<byte>[]? bufferList;
251+
private ReadOnlyMemory<byte> payloadBuffer;
251252

252253
internal SparsePartition(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize, long maxLogEntrySize)
253254
: base(location, offset: 0, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize, FileAttributes.NotContentIndexed | FileAttributes.SparseFile)
@@ -284,6 +285,7 @@ protected override LogEntryMetadata GetMetadata(int index)
284285
protected override void OnCached(in CachedLogEntry cachedEntry, int index)
285286
=> metadataTable[index].Metadata = LogEntryMetadata.Create(cachedEntry, GetMetadataOffset(index) + LogEntryMetadata.Size);
286287

288+
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
287289
protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index, CancellationToken token)
288290
{
289291
var metadataOffset = GetMetadataOffset(index);
@@ -326,6 +328,7 @@ protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index,
326328
throw new InvalidOperationException(ExceptionMessages.LogEntryPayloadTooLarge);
327329
}
328330

331+
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
329332
protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token)
330333
{
331334
if (entry.Length > maxLogEntrySize)
@@ -334,17 +337,32 @@ protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int i
334337
var metadata = LogEntryMetadata.Create(entry, GetMetadataOffset(index) + LogEntryMetadata.Size);
335338
metadata.Format(metadataBuffer.Span);
336339

337-
if (bufferList is null)
338-
{
339-
bufferList = new ReadOnlyMemory<byte>[2];
340-
bufferList[0] = metadataBuffer.Memory;
341-
}
342-
343-
bufferList[1] = entry.Content.Memory;
344-
await RandomAccess.WriteAsync(Handle, bufferList, GetMetadataOffset(index), token).ConfigureAwait(false);
340+
payloadBuffer = entry.Content.Memory;
341+
await RandomAccess.WriteAsync(Handle, this, GetMetadataOffset(index), token).ConfigureAwait(false);
342+
payloadBuffer = default;
345343
metadataTable[index].Metadata = metadata;
346344
}
347345

346+
ReadOnlyMemory<byte> IReadOnlyList<ReadOnlyMemory<byte>>.this[int index] => index switch
347+
{
348+
0 => metadataBuffer.Memory,
349+
1 => payloadBuffer,
350+
_ => throw new ArgumentOutOfRangeException(nameof(index)),
351+
};
352+
353+
int IReadOnlyCollection<ReadOnlyMemory<byte>>.Count => 2;
354+
355+
private IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
356+
{
357+
yield return metadataBuffer.Memory;
358+
yield return payloadBuffer;
359+
}
360+
361+
IEnumerator<ReadOnlyMemory<byte>> IEnumerable<ReadOnlyMemory<byte>>.GetEnumerator()
362+
=> GetEnumerator();
363+
364+
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
365+
348366
protected override void Dispose(bool disposing)
349367
{
350368
metadataTable.Dispose();
@@ -376,124 +394,208 @@ internal LogEntryMetadata Metadata
376394
Partition file format:
377395
FileName - number of partition
378396
Payload:
379-
[struct LogEntryMetadata] X Capacity - prologue with metadata
380-
[octet string] X number of entries
397+
[512 bytes] - header:
398+
[1 byte] - true if completed partition
399+
[struct LogEntryMetadata] [octet string] X Capacity - log entries prefixed with metadata
400+
[struct LogEntryMetadata] X Capacity - a table of log entries within the file, if partition is completed
381401
*/
382-
private sealed class Table : Partition
402+
private sealed class Table : Partition, IReadOnlyList<ReadOnlyMemory<byte>>
383403
{
384-
// metadata management
385-
private MemoryOwner<byte> metadata;
386-
private int metadataFlushStartAddress;
387-
private int metadataFlushEndAddress;
404+
private const int HeaderSize = 512;
388405

389-
// represents offset within the file from which a newly added log entry payload can be recorded
390-
private long writeAddress;
406+
// metadata management
407+
private MemoryOwner<byte> header, footer, metadataBuffer;
408+
private ReadOnlyMemory<byte> payloadBuffer;
391409

392410
internal Table(DirectoryInfo location, int bufferSize, int recordsPerPartition, long partitionNumber, in BufferManager manager, int readersCount, WriteMode writeMode, long initialSize)
393-
: base(location, checked(LogEntryMetadata.Size * recordsPerPartition), bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize)
411+
: base(location, HeaderSize, bufferSize, recordsPerPartition, partitionNumber, in manager, readersCount, writeMode, initialSize)
394412
{
395-
// allocate metadata segment
396-
metadata = manager.BufferAllocator.AllocateExactly(fileOffset);
397-
metadataFlushStartAddress = int.MaxValue;
413+
footer = manager.BufferAllocator.AllocateExactly(recordsPerPartition * LogEntryMetadata.Size);
414+
415+
header = manager.BufferAllocator.AllocateExactly(HeaderSize);
416+
header.Span.Clear();
398417

399-
writeAddress = fileOffset;
418+
metadataBuffer = manager.BufferAllocator.AllocateExactly(LogEntryMetadata.Size);
419+
420+
// init ephemeral 0 entry
421+
if (PartitionNumber is 0L)
422+
{
423+
var metadata = LogEntryMetadata.Create(LogEntry.Initial, HeaderSize + LogEntryMetadata.Size, length: 0L);
424+
metadata.Format(footer.Span);
425+
}
426+
}
427+
428+
private bool IsSealed
429+
{
430+
get => Unsafe.BitCast<byte, bool>(MemoryMarshal.GetReference(header.Span));
431+
set => MemoryMarshal.GetReference(header.Span) = Unsafe.BitCast<bool, byte>(value);
400432
}
401433

402434
internal override void Initialize()
403435
{
404436
using var handle = File.OpenHandle(FileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite, FileOptions.SequentialScan);
405-
if (RandomAccess.Read(handle, metadata.Span, 0L) < fileOffset)
437+
438+
// read header
439+
if (RandomAccess.Read(Handle, header.Span, fileOffset: 0L) < HeaderSize)
406440
{
407-
metadata.Span.Clear();
408-
RandomAccess.Write(handle, metadata.Span, 0L);
441+
header.Span.Clear();
409442
}
410-
else
443+
else if (IsSealed)
411444
{
412-
writeAddress = Math.Max(fileOffset, GetWriteAddress(metadata.Span));
445+
// partition is completed, read table
446+
var tableStart = RandomAccess.GetLength(Handle);
447+
RandomAccess.Read(Handle, footer.Span, tableStart - footer.Length);
413448
}
414-
415-
static long GetWriteAddress(ReadOnlySpan<byte> metadataTable)
449+
else
416450
{
417-
long result;
418-
419-
for (result = 0L; !metadataTable.IsEmpty; metadataTable = metadataTable.Slice(LogEntryMetadata.Size))
451+
// read sequentially every log entry
452+
var metadataBuffer = this.metadataBuffer.Span;
453+
var metadataTable = footer.Span;
454+
int footerOffset = 0;
455+
for (long fileOffset = HeaderSize; ; footerOffset += LogEntryMetadata.Size)
420456
{
421-
result = Math.Max(result, LogEntryMetadata.GetEndOfLogEntry(metadataTable));
422-
}
457+
var count = RandomAccess.Read(Handle, metadataBuffer, fileOffset);
458+
if (count < LogEntryMetadata.Size)
459+
break;
460+
461+
fileOffset = LogEntryMetadata.GetEndOfLogEntry(metadataBuffer);
462+
if (fileOffset is 0L)
463+
break;
423464

424-
return result;
465+
metadataBuffer.CopyTo(metadataTable.Slice(footerOffset, LogEntryMetadata.Size));
466+
}
425467
}
426468
}
427469

428-
private async ValueTask FlushAsync(ReadOnlyMemory<byte> metadata, CancellationToken token)
429-
{
430-
await RandomAccess.WriteAsync(Handle, metadata, metadataFlushStartAddress, token).ConfigureAwait(false);
431-
metadataFlushStartAddress = int.MaxValue;
432-
metadataFlushEndAddress = 0;
470+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
471+
private Span<byte> GetMetadataSpan(int index)
472+
=> footer.Span.Slice(index * LogEntryMetadata.Size, LogEntryMetadata.Size);
433473

434-
await base.FlushAsync(token).ConfigureAwait(false);
435-
}
474+
protected override LogEntryMetadata GetMetadata(int index)
475+
=> new(GetMetadataSpan(index));
436476

437-
public override ValueTask FlushAsync(CancellationToken token = default)
438-
{
439-
var size = metadataFlushEndAddress - metadataFlushStartAddress;
440-
return size > 0
441-
? FlushAsync(metadata.Memory.Slice(metadataFlushStartAddress, size), token)
442-
: base.FlushAsync(token);
443-
}
477+
private long GetWriteAddress(int index)
478+
=> index is 0 ? fileOffset : LogEntryMetadata.GetEndOfLogEntry(GetMetadataSpan(index - 1));
444479

445-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
446-
private Span<byte> GetMetadata(int index, out int offset)
480+
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
481+
protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index, CancellationToken token)
447482
{
448-
Debug.Assert(metadata.Length == fileOffset);
483+
var writeAddress = GetWriteAddress(index);
449484

450-
return metadata.Span.Slice(offset = index * LogEntryMetadata.Size);
451-
}
485+
LogEntryMetadata metadata;
486+
var startPos = writeAddress + LogEntryMetadata.Size;
487+
if (entry.Length is { } length)
488+
{
489+
// fast path - write metadata and entry sequentially
490+
metadata = LogEntryMetadata.Create(entry, startPos, length);
452491

453-
protected override LogEntryMetadata GetMetadata(int index)
454-
=> new(GetMetadata(index, out _));
492+
await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false);
493+
await writer.WriteAsync(metadata, token).ConfigureAwait(false);
494+
await entry.WriteToAsync(writer, token).ConfigureAwait(false);
495+
}
496+
else
497+
{
498+
// slow path - write entry first
499+
await SetWritePositionAsync(startPos, token).ConfigureAwait(false);
455500

456-
private void WriteMetadata(int index, in LogEntryMetadata metadata)
457-
{
458-
metadata.Format(GetMetadata(index, out var offset));
501+
await entry.WriteToAsync(writer, token).ConfigureAwait(false);
502+
length = writer.WritePosition - startPos;
459503

460-
metadataFlushStartAddress = Math.Min(metadataFlushStartAddress, offset);
461-
metadataFlushEndAddress = Math.Max(metadataFlushEndAddress, offset + LogEntryMetadata.Size);
462-
}
504+
metadata = LogEntryMetadata.Create(entry, startPos, length);
505+
metadata.Format(metadataBuffer.Span);
506+
await RandomAccess.WriteAsync(Handle, metadataBuffer.Memory, writeAddress, token).ConfigureAwait(false);
507+
}
463508

464-
protected override async ValueTask PersistAsync<TEntry>(TEntry entry, int index, CancellationToken token)
465-
{
466-
// slow path - persist log entry
467-
await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false);
468-
await entry.WriteToAsync(writer, token).ConfigureAwait(false);
509+
metadata.Format(GetMetadataSpan(index));
510+
511+
if (index == LastIndex)
512+
{
513+
// write footer with metadata table
514+
await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false);
515+
516+
// seal the partition
517+
IsSealed = true;
518+
}
519+
else if (IsSealed)
520+
{
521+
// unseal
522+
IsSealed = false;
523+
}
524+
else
525+
{
526+
return;
527+
}
469528

470-
// save new log entry to the allocation table
471-
var length = writer.WritePosition - writeAddress;
472-
WriteMetadata(index, LogEntryMetadata.Create(entry, writeAddress, length));
473-
writeAddress += length;
529+
await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false);
474530
}
475531

532+
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
476533
protected override async ValueTask WriteThroughAsync(CachedLogEntry entry, int index, CancellationToken token)
477534
{
478-
await SetWritePositionAsync(writeAddress, token).ConfigureAwait(false);
479-
Debug.Assert(writer.HasBufferedData is false);
535+
var writeAddress = GetWriteAddress(index);
536+
var startPos = writeAddress + LogEntryMetadata.Size;
537+
var metadata = LogEntryMetadata.Create(entry, startPos, entry.Length);
538+
metadata.Format(metadataBuffer.Span);
539+
540+
payloadBuffer = entry.Content.Memory;
541+
await RandomAccess.WriteAsync(Handle, this, writeAddress, token).ConfigureAwait(false);
542+
payloadBuffer = default;
480543

481-
await RandomAccess.WriteAsync(Handle, entry.Content.Memory, writeAddress, token).ConfigureAwait(false);
544+
metadata.Format(GetMetadataSpan(index));
482545

483-
// save new log entry to the allocation table
484-
WriteMetadata(index, LogEntryMetadata.Create(entry, writeAddress, entry.Length));
485-
writer.FilePosition = writeAddress += entry.Length;
546+
if (index == LastIndex)
547+
{
548+
// write footer with metadata table
549+
await RandomAccess.WriteAsync(Handle, footer.Memory, metadata.End, token).ConfigureAwait(false);
550+
551+
// seal the partition
552+
IsSealed = true;
553+
}
554+
else if (IsSealed)
555+
{
556+
// unseal
557+
IsSealed = false;
558+
}
559+
else
560+
{
561+
return;
562+
}
563+
564+
await RandomAccess.WriteAsync(Handle, header.Memory, fileOffset: 0L, token).ConfigureAwait(false);
486565
}
487566

488567
protected override void OnCached(in CachedLogEntry cachedEntry, int index)
489568
{
490-
WriteMetadata(index, LogEntryMetadata.Create(cachedEntry, writeAddress, cachedEntry.Length));
491-
writeAddress += cachedEntry.Length;
569+
var startPos = GetWriteAddress(index) + LogEntryMetadata.Size;
570+
var metadata = LogEntryMetadata.Create(in cachedEntry, startPos);
571+
metadata.Format(GetMetadataSpan(index));
572+
}
573+
574+
ReadOnlyMemory<byte> IReadOnlyList<ReadOnlyMemory<byte>>.this[int index] => index switch
575+
{
576+
0 => metadataBuffer.Memory,
577+
1 => payloadBuffer,
578+
_ => throw new ArgumentOutOfRangeException(nameof(index)),
579+
};
580+
581+
int IReadOnlyCollection<ReadOnlyMemory<byte>>.Count => 2;
582+
583+
private IEnumerator<ReadOnlyMemory<byte>> GetEnumerator()
584+
{
585+
yield return metadataBuffer.Memory;
586+
yield return payloadBuffer;
492587
}
493588

589+
IEnumerator<ReadOnlyMemory<byte>> IEnumerable<ReadOnlyMemory<byte>>.GetEnumerator()
590+
=> GetEnumerator();
591+
592+
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
593+
494594
protected override void Dispose(bool disposing)
495595
{
496-
metadata.Dispose();
596+
header.Dispose();
597+
footer.Dispose();
598+
metadataBuffer.Dispose();
497599
base.Dispose(disposing);
498600
}
499601
}

0 commit comments

Comments
 (0)