Skip to content

Commit c2711e2

Browse files
authored
Merge pull request #930 from aws-powertools/feature/idempotency-ReturnValuesOnConditionCheckFailure
feat: add support for ReturnValuesOnConditionCheckFailure in Idempotency
2 parents b634e4e + 0e08856 commit c2711e2

File tree

6 files changed

+263
-55
lines changed

6 files changed

+263
-55
lines changed

libraries/src/AWS.Lambda.Powertools.Idempotency/Exceptions/IdempotencyItemAlreadyExistsException.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
*/
1515

1616
using System;
17+
using AWS.Lambda.Powertools.Idempotency.Persistence;
1718

1819
namespace AWS.Lambda.Powertools.Idempotency.Exceptions;
1920

@@ -22,6 +23,11 @@ namespace AWS.Lambda.Powertools.Idempotency.Exceptions;
2223
/// </summary>
2324
public class IdempotencyItemAlreadyExistsException : Exception
2425
{
26+
/// <summary>
27+
/// The record that already exists in the persistence layer.
28+
/// </summary>
29+
public DataRecord Record { get; set; }
30+
2531
/// <summary>
2632
/// Creates a new IdempotencyItemAlreadyExistsException
2733
/// </summary>

libraries/src/AWS.Lambda.Powertools.Idempotency/Internal/IdempotencyAspectHandler.cs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,25 @@ private async Task<T> ProcessIdempotency()
106106
// already exists. If it succeeds, there's no need to call getRecord.
107107
await _persistenceStore.SaveInProgress(_data, DateTimeOffset.UtcNow, GetRemainingTimeInMillis());
108108
}
109-
catch (IdempotencyItemAlreadyExistsException)
109+
catch (IdempotencyItemAlreadyExistsException ex)
110110
{
111-
var record = await GetIdempotencyRecord();
111+
DataRecord record;
112+
113+
if(ex.Record != null)
114+
{
115+
// If the error includes the existing record, we can use it to validate
116+
// the record being processed and cache it in memory.
117+
var existingRecord = _persistenceStore.ProcessExistingRecord(ex.Record, _data);
118+
record = existingRecord;
119+
}
120+
else
121+
{
122+
// If the error doesn't include the existing record, we need to fetch
123+
// it from the persistence layer. In doing so, we also call the processExistingRecord
124+
// method to validate the record and cache it in memory.
125+
record = await GetIdempotencyRecord();
126+
}
127+
112128
return await HandleForStatus(record);
113129
}
114130
catch (IdempotencyKeyException)

libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/BasePersistenceStore.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,4 +378,20 @@ private static string GetHash(HashAlgorithm hashAlgorithm, string input)
378378

379379
/// <inheritdoc />
380380
public abstract Task DeleteRecord(string idempotencyKey);
381+
382+
/// <summary>
383+
/// Validates an existing record against the data payload being processed.
384+
/// If the payload does not match the stored record, an `IdempotencyValidationError` error is thrown.
385+
/// Whenever a record is retrieved from the persistence layer, it should be validated against the data payload
386+
/// being processed. This is to ensure that the data payload being processed is the same as the one that was
387+
/// used to create the record in the first place.
388+
///
389+
/// The record is also saved to the local cache if local caching is enabled.
390+
/// </summary>
391+
public virtual DataRecord ProcessExistingRecord(DataRecord exRecord, JsonDocument data)
392+
{
393+
ValidatePayload(data, exRecord);
394+
SaveToCache(exRecord);
395+
return exRecord;
396+
}
381397
}

libraries/src/AWS.Lambda.Powertools.Idempotency/Persistence/DynamoDBPersistenceStore.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now)
191191
Item = item,
192192
ConditionExpression = "attribute_not_exists(#id) OR #expiry < :now OR (attribute_exists(#in_progress_expiry) AND #in_progress_expiry < :now_milliseconds AND #status = :inprogress)",
193193
ExpressionAttributeNames = expressionAttributeNames,
194+
ReturnValuesOnConditionCheckFailure = ReturnValuesOnConditionCheckFailure.ALL_OLD,
194195
ExpressionAttributeValues = new Dictionary<string, AttributeValue>
195196
{
196197
{":now", new AttributeValue {N = now.ToUnixTimeSeconds().ToString()}},
@@ -202,8 +203,20 @@ public override async Task PutRecord(DataRecord record, DateTimeOffset now)
202203
}
203204
catch (ConditionalCheckFailedException e)
204205
{
205-
throw new IdempotencyItemAlreadyExistsException(
206+
var ex = new IdempotencyItemAlreadyExistsException(
206207
"Failed to put record for already existing idempotency key: " + record.IdempotencyKey, e);
208+
209+
if (e.Item != null)
210+
{
211+
ex.Record = new DataRecord(e.Item[_keyAttr].S,
212+
Enum.Parse<DataRecord.DataRecordStatus>(e.Item[_statusAttr].S),
213+
long.Parse(e.Item[_expiryAttr].N),
214+
e.Item.TryGetValue(_dataAttr, out var data) ? data?.S : null,
215+
e.Item.TryGetValue(_validationAttr, out var validation) ? validation?.S : null,
216+
e.Item.TryGetValue(_inProgressExpiryAttr, out var inProgExp) ? long.Parse(inProgExp.N) : null);
217+
}
218+
219+
throw ex;
207220
}
208221
}
209222

libraries/tests/AWS.Lambda.Powertools.Idempotency.Tests/Persistence/BasePersistenceStoreTests.cs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -524,7 +524,7 @@ public void GenerateHash_WhenInputIsDouble_ShouldGenerateMd5ofDouble()
524524
// Assert
525525
generatedHash.Should().Be(expectedHash);
526526
}
527-
527+
528528
[Fact]
529529
public async Task When_Key_Prefix_Set_Should_Create_With_Prefix()
530530
{
@@ -563,4 +563,35 @@ private static APIGatewayProxyRequest LoadApiGatewayProxyRequest()
563563
throw;
564564
}
565565
}
566+
567+
[Fact]
568+
public async Task ProcessExistingRecord_WhenValidRecord_ShouldReturnRecordAndSaveToCache()
569+
{
570+
// Arrange
571+
var persistenceStore = new InMemoryPersistenceStore();
572+
var request = LoadApiGatewayProxyRequest();
573+
LRUCache<string, DataRecord> cache = new(2);
574+
575+
persistenceStore.Configure(new IdempotencyOptionsBuilder()
576+
.WithUseLocalCache(true)
577+
.Build(), null, null, cache);
578+
579+
var now = DateTimeOffset.UtcNow;
580+
var existingRecord = new DataRecord(
581+
"testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6",
582+
DataRecord.DataRecordStatus.COMPLETED,
583+
now.AddSeconds(3600).ToUnixTimeSeconds(),
584+
"existing response",
585+
null);
586+
587+
// Act
588+
var result =
589+
persistenceStore.ProcessExistingRecord(existingRecord, JsonSerializer.SerializeToDocument(request)!);
590+
591+
// Assert
592+
result.Should().Be(existingRecord);
593+
cache.Count.Should().Be(1);
594+
cache.TryGet("testFunction#5eff007a9ed2789a9f9f6bc182fc6ae6", out var cachedRecord).Should().BeTrue();
595+
cachedRecord.Should().Be(existingRecord);
596+
}
566597
}

0 commit comments

Comments
 (0)