Skip to content

Commit c651ecb

Browse files
Add stage block and commit blocklist blob hooks (#51)
1 parent 5650aac commit c651ecb

File tree

11 files changed

+212
-36
lines changed

11 files changed

+212
-36
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ jobs:
2323
outputs:
2424
version: ${{ steps.resolve-version.outputs.version }}
2525
publish-nuget-org: ${{ startsWith(github.ref, 'refs/tags/v') }}
26-
publish-github: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }}
26+
publish-github: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' || contains(github.event.pull_request.labels.*.name, 'publish-artifacts') }}
2727
steps:
2828
- uses: actions/checkout@v4
2929
with:

docs/storage.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,8 @@ Following hooks are supported in both `Before` and `After` variants:
481481
- `Upload`
482482
- `OpenRead`
483483
- `OpenWrite`
484+
- `StageBlock`
485+
- `CommitBlockList`
484486
- All `Container` operations
485487
- `Create` / `CreateIfNotExists`
486488
- All `Table Service` operations

src/Spotflow.InMemory.Azure.Storage/Blobs/Hooks/BlobOperations.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,7 @@ public enum BlobOperations
88
Upload = 2,
99
OpenRead = 4,
1010
OpenWrite = 8,
11-
All = Download | Upload | OpenRead | OpenWrite
11+
StageBlock = 16,
12+
CommitBlockList = 32,
13+
All = Download | Upload | OpenRead | OpenWrite | StageBlock | CommitBlockList
1214
}

src/Spotflow.InMemory.Azure.Storage/Blobs/Hooks/BlobServiceHookBuilder.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,13 @@ internal BlobOperationsBuilder(BlobHookFilter filter)
8686

8787
public StorageHook<BlobUploadAfterHookContext> AfterBlobUpload(HookFunc<BlobUploadAfterHookContext> hook) => new(hook, _filter);
8888

89+
public StorageHook<BlobStageBlockBeforeHookContext> BeforeStageBlock(HookFunc<BlobStageBlockBeforeHookContext> hook) => new(hook, _filter);
90+
91+
public StorageHook<BlobStageBlockAfterHookContext> AfterStageBlock(HookFunc<BlobStageBlockAfterHookContext> hook) => new(hook, _filter);
92+
93+
public StorageHook<BlobCommitBlockListBeforeHookContext> BeforeCommitBlockList(HookFunc<BlobCommitBlockListBeforeHookContext> hook) => new(hook, _filter);
94+
95+
public StorageHook<BlobCommitBlockListAfterHookContext> AfterCommitBlockList(HookFunc<BlobCommitBlockListAfterHookContext> hook) => new(hook, _filter);
8996

9097
}
9198

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Azure.Storage.Blobs.Models;
2+
3+
namespace Spotflow.InMemory.Azure.Storage.Blobs.Hooks.Contexts;
4+
5+
public class BlobCommitBlockListAfterHookContext : BlobAfterHookContext
6+
{
7+
internal BlobCommitBlockListAfterHookContext(BlobCommitBlockListBeforeHookContext beforeContext, BlobContentInfo blobContentInfo) : base(beforeContext)
8+
{
9+
BeforeContext = beforeContext;
10+
BlobContentInfo = blobContentInfo;
11+
}
12+
13+
public BlobCommitBlockListBeforeHookContext BeforeContext { get; }
14+
public BlobContentInfo BlobContentInfo { get; }
15+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Azure.Storage.Blobs.Models;
2+
3+
namespace Spotflow.InMemory.Azure.Storage.Blobs.Hooks.Contexts;
4+
5+
public class BlobCommitBlockListBeforeHookContext(BlobScope scope, InMemoryStorageProvider provider, CancellationToken cancellationToken)
6+
: BlobBeforeHookContext(scope, BlobOperations.CommitBlockList, provider, cancellationToken)
7+
{
8+
public required CommitBlockListOptions? Options { get; init; }
9+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Azure.Storage.Blobs.Models;
2+
3+
namespace Spotflow.InMemory.Azure.Storage.Blobs.Hooks.Contexts;
4+
5+
public class BlobStageBlockAfterHookContext : BlobAfterHookContext
6+
{
7+
internal BlobStageBlockAfterHookContext(BlobStageBlockBeforeHookContext beforeContext, BlockInfo blockInfo) : base(beforeContext)
8+
{
9+
BeforeContext = beforeContext;
10+
BlockInfo = blockInfo;
11+
}
12+
13+
public BlobStageBlockBeforeHookContext BeforeContext { get; }
14+
public BlockInfo BlockInfo { get; }
15+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
using Azure.Storage.Blobs.Models;
2+
3+
namespace Spotflow.InMemory.Azure.Storage.Blobs.Hooks.Contexts;
4+
5+
public class BlobStageBlockBeforeHookContext(BlobScope scope, InMemoryStorageProvider provider, CancellationToken cancellationToken)
6+
: BlobBeforeHookContext(scope, BlobOperations.StageBlock, provider, cancellationToken)
7+
{
8+
public required BlockBlobStageBlockOptions? Options { get; init; }
9+
}

src/Spotflow.InMemory.Azure.Storage/Blobs/InMemoryBlockBlobClient.cs

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -103,30 +103,31 @@ public override async Task<Response<BlockList>> GetBlockListAsync(BlockListTypes
103103

104104
public override Response<BlockInfo> StageBlock(string base64BlockId, Stream content, BlockBlobStageBlockOptions? options = null, CancellationToken cancellationToken = default)
105105
{
106-
var blockInfo = _core.StageBlock(base64BlockId, BinaryData.FromStream(content), options, cancellationToken);
107-
return InMemoryResponse.FromValue(blockInfo, 201);
106+
return StageBlockAsync(base64BlockId, content, options, cancellationToken).EnsureCompleted();
108107
}
109108

110109
public override Response<BlockInfo> StageBlock(string base64BlockId, Stream content, byte[] transactionalContentHash, BlobRequestConditions conditions, IProgress<long> progressHandler, CancellationToken cancellationToken)
111110
{
112-
var options = new BlockBlobStageBlockOptions
113-
{
114-
Conditions = conditions
115-
};
116-
117-
return StageBlock(base64BlockId, content, options, cancellationToken);
111+
return StageBlockAsync(base64BlockId, content, transactionalContentHash, conditions, progressHandler, cancellationToken).EnsureCompleted();
118112
}
119113

120114
public override async Task<Response<BlockInfo>> StageBlockAsync(string base64BlockId, Stream content, byte[] transactionalContentHash, BlobRequestConditions conditions, IProgress<long> progressHandler, CancellationToken cancellationToken)
121115
{
122116
await Task.Yield();
123-
return StageBlock(base64BlockId, content, transactionalContentHash, conditions, progressHandler, cancellationToken);
117+
118+
var options = new BlockBlobStageBlockOptions
119+
{
120+
Conditions = conditions
121+
};
122+
123+
return await StageBlockAsync(base64BlockId, content, options, cancellationToken);
124124
}
125125

126126
public override async Task<Response<BlockInfo>> StageBlockAsync(string base64BlockId, Stream content, BlockBlobStageBlockOptions? options = null, CancellationToken cancellationToken = default)
127127
{
128128
await Task.Yield();
129-
return StageBlock(base64BlockId, content, options, cancellationToken);
129+
var blockInfo = await _core.StageBlockAsync(base64BlockId, BinaryData.FromStream(content), options, cancellationToken);
130+
return InMemoryResponse.FromValue(blockInfo, 201);
130131
}
131132

132133
#endregion
@@ -138,8 +139,7 @@ public override Response<BlobContentInfo> CommitBlockList(
138139
CommitBlockListOptions options,
139140
CancellationToken cancellationToken = default)
140141
{
141-
var contentInfo = _core.CommitBlockList(base64BlockIds, options, cancellationToken);
142-
return InMemoryResponse.FromValue(contentInfo, 201);
142+
return CommitBlockListAsync(base64BlockIds, options, cancellationToken).EnsureCompleted();
143143
}
144144

145145
public override Response<BlobContentInfo> CommitBlockList(
@@ -150,22 +150,29 @@ public override Response<BlobContentInfo> CommitBlockList(
150150
AccessTier? accessTier = null,
151151
CancellationToken cancellationToken = default)
152152
{
153-
var options = new CommitBlockListOptions { HttpHeaders = httpHeaders, Metadata = metadata, Conditions = conditions, AccessTier = accessTier };
154-
155-
return CommitBlockList(base64BlockIds, options, cancellationToken);
153+
return CommitBlockListAsync(base64BlockIds, httpHeaders, metadata, conditions, accessTier, cancellationToken).EnsureCompleted();
156154
}
157155

158156
public override async Task<Response<BlobContentInfo>> CommitBlockListAsync(IEnumerable<string> base64BlockIds, CommitBlockListOptions options, CancellationToken cancellationToken = default)
159157
{
160158
await Task.Yield();
161-
return CommitBlockList(base64BlockIds, options, cancellationToken);
162-
159+
var contentInfo = await _core.CommitBlockListAsync(base64BlockIds, options, cancellationToken);
160+
return InMemoryResponse.FromValue(contentInfo, 201);
163161
}
164162

165163
public override async Task<Response<BlobContentInfo>> CommitBlockListAsync(IEnumerable<string> base64BlockIds, BlobHttpHeaders? httpHeaders = null, IDictionary<string, string>? metadata = null, BlobRequestConditions? conditions = null, AccessTier? accessTier = null, CancellationToken cancellationToken = default)
166164
{
167165
await Task.Yield();
168-
return CommitBlockList(base64BlockIds, httpHeaders, metadata, conditions, accessTier, cancellationToken);
166+
167+
var options = new CommitBlockListOptions
168+
{
169+
HttpHeaders = httpHeaders,
170+
Metadata = metadata,
171+
Conditions = conditions,
172+
AccessTier = accessTier
173+
};
174+
175+
return await CommitBlockListAsync(base64BlockIds, options, cancellationToken);
169176
}
170177

171178
#endregion
@@ -491,31 +498,34 @@ public override Uri GenerateSasUri(BlobSasBuilder builder)
491498

492499
public override Response<BlockInfo> StageBlockFromUri(Uri sourceUri, string base64BlockId, StageBlockFromUriOptions? options = null, CancellationToken cancellationToken = default)
493500
{
494-
var properties = _core.StageBlockFromUri(sourceUri, base64BlockId, options, cancellationToken);
495-
return InMemoryResponse.FromValue(properties, 201);
501+
return StageBlockFromUriAsync(sourceUri, base64BlockId, options, cancellationToken).EnsureCompleted();
496502
}
497503

498504
public override async Task<Response<BlockInfo>> StageBlockFromUriAsync(Uri sourceUri, string base64BlockId, StageBlockFromUriOptions? options = null, CancellationToken cancellationToken = default)
499505
{
500506
await Task.Yield();
501-
return StageBlockFromUri(sourceUri, base64BlockId, options, cancellationToken);
507+
var properties = await _core.StageBlockFromUriAsync(sourceUri, base64BlockId, options, cancellationToken);
508+
return InMemoryResponse.FromValue(properties, 201);
502509
}
503510

504511
public override Response<BlockInfo> StageBlockFromUri(Uri sourceUri, string base64BlockId, HttpRange sourceRange, byte[] sourceContentHash, RequestConditions sourceConditions, BlobRequestConditions conditions, CancellationToken cancellationToken)
505512
{
506-
return StageBlockFromUri(sourceUri, base64BlockId, new StageBlockFromUriOptions()
513+
return StageBlockFromUriAsync(sourceUri, base64BlockId, sourceRange, sourceContentHash, sourceConditions, conditions, cancellationToken).EnsureCompleted();
514+
}
515+
516+
public override async Task<Response<BlockInfo>> StageBlockFromUriAsync(Uri sourceUri, string base64BlockId, HttpRange sourceRange, byte[] sourceContentHash, RequestConditions sourceConditions, BlobRequestConditions conditions, CancellationToken cancellationToken)
517+
{
518+
await Task.Yield();
519+
520+
var options = new StageBlockFromUriOptions()
507521
{
508522
SourceRange = sourceRange,
509523
SourceContentHash = sourceContentHash,
510524
SourceConditions = sourceConditions,
511525
DestinationConditions = conditions
512-
}, cancellationToken);
513-
}
526+
};
514527

515-
public override async Task<Response<BlockInfo>> StageBlockFromUriAsync(Uri sourceUri, string base64BlockId, HttpRange sourceRange, byte[] sourceContentHash, RequestConditions sourceConditions, BlobRequestConditions conditions, CancellationToken cancellationToken)
516-
{
517-
await Task.Yield();
518-
return StageBlockFromUri(sourceUri, base64BlockId, sourceRange, sourceContentHash, sourceConditions, conditions, cancellationToken);
528+
return await StageBlockFromUriAsync(sourceUri, base64BlockId, options, cancellationToken);
519529
}
520530

521531
#endregion

src/Spotflow.InMemory.Azure.Storage/Blobs/Internals/BlobClientCore.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,18 +194,38 @@ public async Task<BlobContentInfo> UploadAsync(BinaryData content, BlobUploadOpt
194194

195195

196196

197-
public BlobContentInfo CommitBlockList(IEnumerable<string> blockIds, CommitBlockListOptions? options, CancellationToken cancellationToken)
197+
public async Task<BlobContentInfo> CommitBlockListAsync(IEnumerable<string> blockIds, CommitBlockListOptions? options, CancellationToken cancellationToken)
198198
{
199+
var beforeContext = new BlobCommitBlockListBeforeHookContext(_scope, Provider, cancellationToken)
200+
{
201+
Options = options
202+
};
203+
204+
await ExecuteBeforeHooksAsync(beforeContext);
205+
199206
RequestConditions? conditions = options?.Conditions;
200207

201208
using var blob = AcquireBlob(cancellationToken);
202209

203-
return CommitBlockListCoreUnsafe(blockIds, blob.Value, conditions, null, options?.HttpHeaders, options?.Metadata);
210+
var info = CommitBlockListCoreUnsafe(blockIds, blob.Value, conditions, null, options?.HttpHeaders, options?.Metadata);
211+
212+
var afterContext = new BlobCommitBlockListAfterHookContext(beforeContext, info);
213+
214+
await ExecuteAfterHooksAsync(afterContext);
215+
216+
return info;
204217
}
205218

206219

207-
public BlockInfo StageBlock(string blockId, BinaryData content, BlockBlobStageBlockOptions? options, CancellationToken cancellationToken)
220+
public async Task<BlockInfo> StageBlockAsync(string blockId, BinaryData content, BlockBlobStageBlockOptions? options, CancellationToken cancellationToken)
208221
{
222+
var beforeContext = new BlobStageBlockBeforeHookContext(_scope, Provider, cancellationToken)
223+
{
224+
Options = options
225+
};
226+
227+
await ExecuteBeforeHooksAsync(beforeContext);
228+
209229
RequestConditions? conditions = options?.Conditions;
210230

211231
using var blob = AcquireBlob(cancellationToken);
@@ -215,7 +235,13 @@ public BlockInfo StageBlock(string blockId, BinaryData content, BlockBlobStageBl
215235
throw stageError.GetClientException();
216236
}
217237

218-
return block.GetInfo();
238+
var info = block.GetInfo();
239+
240+
var afterContext = new BlobStageBlockAfterHookContext(beforeContext, info);
241+
242+
await ExecuteAfterHooksAsync(afterContext);
243+
244+
return info;
219245
}
220246

221247
public async Task<Stream> OpenWriteAsync(bool overwrite, BlobOpenWriteOptions? options, CancellationToken cancellationToken)
@@ -332,7 +358,7 @@ public async Task<Stream> OpenReadAsync(BlobOpenReadOptions options, Cancellatio
332358
return stream;
333359
}
334360

335-
public BlockInfo StageBlockFromUri(Uri sourceUri, string base64BlockId, StageBlockFromUriOptions? options, CancellationToken cancellationToken)
361+
public async Task<BlockInfo> StageBlockFromUriAsync(Uri sourceUri, string base64BlockId, StageBlockFromUriOptions? options, CancellationToken cancellationToken)
336362
{
337363
var sourceUriBuilder = new BlobUriBuilder(sourceUri);
338364
var sourceClient = new BlobClientCore(sourceUriBuilder, Provider);
@@ -383,7 +409,7 @@ public BlockInfo StageBlockFromUri(Uri sourceUri, string base64BlockId, StageBlo
383409
Conditions = options?.DestinationConditions ?? new BlobRequestConditions(),
384410
};
385411

386-
return StageBlock(base64BlockId, sourceContent, stageOptions, cancellationToken);
412+
return await StageBlockAsync(base64BlockId, sourceContent, stageOptions, cancellationToken);
387413
}
388414
}
389415

0 commit comments

Comments
 (0)