Skip to content

Commit bc5d7a1

Browse files
committed
Partial support of sparse files backup
1 parent b3adac3 commit bc5d7a1

File tree

3 files changed

+98
-1
lines changed

3 files changed

+98
-1
lines changed

src/DotNext.Tests/Net/Cluster/Consensus/Raft/MemoryBasedStateMachineTests.cs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,37 @@ public static async Task RestoreBackup()
839839
}
840840
}
841841

842+
[Fact]
843+
public static async Task CreateSparseBackup()
844+
{
845+
var entry1 = new TestLogEntry("SET X = 0") { Term = 42L };
846+
var entry2 = new TestLogEntry("SET Y = 1") { Term = 43L };
847+
var entry3 = new TestLogEntry("SET Z = 2") { Term = 44L };
848+
var entry4 = new TestLogEntry("SET U = 3") { Term = 45L };
849+
var entry5 = new TestLogEntry("SET V = 4") { Term = 46L };
850+
var dir = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
851+
var backupFile = Path.GetTempFileName();
852+
IPersistentState state = new PersistentStateWithoutSnapshot(dir, RecordsPerPartition, new() { MaxLogEntrySize = 1024 * 1024, BackupFormat = System.Formats.Tar.TarEntryFormat.Gnu });
853+
var member = ClusterMemberId.FromEndPoint(new IPEndPoint(IPAddress.IPv6Loopback, 3232));
854+
try
855+
{
856+
//define node state
857+
Equal(1, await state.IncrementTermAsync(member));
858+
True(state.IsVotedFor(member));
859+
//define log entries
860+
Equal(1L, await state.AppendAsync(new LogEntryList(entry1, entry2, entry3, entry4, entry5)));
861+
//commit some of them
862+
Equal(2L, await state.CommitAsync(2L));
863+
//save backup
864+
await using var backupStream = new FileStream(backupFile, FileMode.Truncate, FileAccess.Write, FileShare.None, 1024, true);
865+
await state.CreateBackupAsync(backupStream);
866+
}
867+
finally
868+
{
869+
(state as IDisposable)?.Dispose();
870+
}
871+
}
872+
842873
[Fact]
843874
public static async Task Reconstruction()
844875
{

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Backup.cs

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ private static void ImportAttributes(SafeFileHandle handle, TarEntry entry)
4646

4747
File.SetUnixFileMode(handle, entry.Mode);
4848
}
49+
50+
var attributes = FileAttributes.NotContentIndexed;
51+
if (entry.EntryType is TarEntryType.SparseFile)
52+
attributes |= FileAttributes.SparseFile;
53+
54+
File.SetAttributes(handle, attributes);
4955
}
5056

5157
/// <summary>
@@ -55,7 +61,64 @@ private static void ImportAttributes(SafeFileHandle handle, TarEntry entry)
5561
/// <param name="token">The token that can be used to cancel the operation.</param>
5662
/// <returns>A task representing state of asynchronous execution.</returns>
5763
/// <exception cref="OperationCanceledException">The operation has been canceled.</exception>
58-
public async Task CreateBackupAsync(Stream output, CancellationToken token = default)
64+
public Task CreateBackupAsync(Stream output, CancellationToken token = default)
65+
=> maxLogEntrySize.HasValue ? CreateSparseBackupAsync(output, token) : CreateRegularBackupAsync(output, token);
66+
67+
private async Task CreateSparseBackupAsync(Stream output, CancellationToken token)
68+
{
69+
var tarProcess = new Process
70+
{
71+
StartInfo = new()
72+
{
73+
FileName = "tar",
74+
WorkingDirectory = Location.FullName,
75+
},
76+
};
77+
78+
var outputArchive = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
79+
tarProcess.StartInfo.ArgumentList.Add("cfS");
80+
tarProcess.StartInfo.ArgumentList.Add(outputArchive);
81+
82+
FileStream? archiveStream = null;
83+
await syncRoot.AcquireAsync(LockType.StrongReadLock, token).ConfigureAwait(false);
84+
try
85+
{
86+
foreach (var file in Location.EnumerateFiles())
87+
{
88+
tarProcess.StartInfo.ArgumentList.Add(file.Name);
89+
}
90+
91+
tarProcess.StartInfo.ArgumentList.Add($"--format={GetArchiveFormat(backupFormat)}");
92+
tarProcess.Start();
93+
await tarProcess.WaitForExitAsync(token).ConfigureAwait(false);
94+
95+
if (tarProcess.ExitCode is not 0)
96+
throw new InvalidOperationException() { HResult = tarProcess.ExitCode };
97+
98+
archiveStream = new(outputArchive, FileMode.Open, FileAccess.Read, FileShare.Read, 4096, FileOptions.SequentialScan | FileOptions.Asynchronous | FileOptions.DeleteOnClose);
99+
await archiveStream.CopyToAsync(output, token).ConfigureAwait(false);
100+
await output.FlushAsync(token).ConfigureAwait(false);
101+
}
102+
finally
103+
{
104+
syncRoot.Release(LockType.StrongReadLock);
105+
tarProcess.Dispose();
106+
107+
if (archiveStream is not null)
108+
await archiveStream.DisposeAsync().ConfigureAwait(false);
109+
}
110+
111+
static string GetArchiveFormat(TarEntryFormat format) => format switch
112+
{
113+
TarEntryFormat.Gnu => "gnu",
114+
TarEntryFormat.Pax => "pax",
115+
TarEntryFormat.Ustar => "ustar",
116+
TarEntryFormat.V7 => "v7",
117+
_ => "gnu",
118+
};
119+
}
120+
121+
private async Task CreateRegularBackupAsync(Stream output, CancellationToken token)
59122
{
60123
TarWriter? archive = null;
61124
await syncRoot.AcquireAsync(LockType.StrongReadLock, token).ConfigureAwait(false);

src/cluster/DotNext.Net.Cluster/Net/Cluster/Consensus/Raft/PersistentState.Options.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,9 @@ public int MaxConcurrentReads
138138
/// </summary>
139139
/// <remarks>
140140
/// If enabled, WAL uses sparse files to optimize performance.
141+
/// <see cref="CreateBackupAsync(Stream, CancellationToken)"/> method supports backup of sparse
142+
/// files on Linux only. <see cref="RestoreFromBackupAsync(Stream, DirectoryInfo, CancellationToken)"/>
143+
/// method cannot restore the backup, you need to use <c>tar</c> utility to extract files.
141144
/// </remarks>
142145
public long? MaxLogEntrySize
143146
{

0 commit comments

Comments
 (0)