Skip to content

Commit e1f4521

Browse files
authored
KAFKA-19485 (II) : Complete any pending acknowledgements in ShareFetch on an error response. (#20247)
*What* Currently when we received a top level error response in ShareFetch, we would log the error, update the share session epoch and proceed to the next request. But these acknowledgements(if any) are not completed and the callback would not have been processed. PR aims to address this by completing these acknowledgements with the error code from the response in this case. Reviewers: Andrew Schofield <aschofield@confluent.io>
1 parent f383593 commit e1f4521

File tree

2 files changed

+43
-0
lines changed

2 files changed

+43
-0
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -758,6 +758,16 @@ private void handleShareFetchSuccess(Node fetchTarget,
758758
if (response.error() == Errors.UNKNOWN_TOPIC_ID) {
759759
metadata.requestUpdate(false);
760760
}
761+
// Complete any inFlight acknowledgements with the error code from the response.
762+
Map<TopicIdPartition, Acknowledgements> nodeAcknowledgementsInFlight = fetchAcknowledgementsInFlight.get(fetchTarget.id());
763+
if (nodeAcknowledgementsInFlight != null) {
764+
nodeAcknowledgementsInFlight.forEach((tip, acks) -> {
765+
acks.complete(Errors.forCode(response.error().code()).exception());
766+
metricsManager.recordFailedAcknowledgements(acks.size());
767+
});
768+
maybeSendShareAcknowledgeCommitCallbackEvent(nodeAcknowledgementsInFlight);
769+
nodeAcknowledgementsInFlight.clear();
770+
}
761771
return;
762772
}
763773

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1455,6 +1455,39 @@ public void testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionC
14551455
assertEquals(Errors.INVALID_SHARE_SESSION_EPOCH.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
14561456
}
14571457

1458+
@Test
1459+
public void testPiggybackAcknowledgementsOnInitialShareSession_ShareSessionNotFound() {
1460+
buildRequestManager();
1461+
shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
1462+
1463+
assignFromSubscribed(singleton(tp0));
1464+
sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE);
1465+
1466+
fetchRecords();
1467+
1468+
// The acknowledgements for the initial fetch from tip0 are processed now and sent to the background thread.
1469+
Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
1470+
shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
1471+
1472+
// We attempt to send the acknowledgements piggybacking on the fetch.
1473+
assertEquals(1, sendFetches());
1474+
assertFalse(shareConsumeRequestManager.hasCompletedFetches());
1475+
1476+
// Simulate a broker restart, but no leader change, this resets share session epoch to 0.
1477+
client.prepareResponse(fetchResponseWithTopLevelError(tip0, Errors.SHARE_SESSION_NOT_FOUND));
1478+
networkClientDelegate.poll(time.timer(0));
1479+
1480+
// We would complete these acknowledgements with the error code from the response.
1481+
assertEquals(3, completedAcknowledgements.get(0).get(tip0).size());
1482+
assertEquals(Errors.SHARE_SESSION_NOT_FOUND.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException());
1483+
1484+
// Next fetch would proceed as expected and would not include any acknowledgements.
1485+
NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult();
1486+
assertEquals(1, pollResult.unsentRequests.size());
1487+
ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder) pollResult.unsentRequests.get(0).requestBuilder();
1488+
assertEquals(0, builder.data().topics().find(topicId).partitions().find(0).acknowledgementBatches().size());
1489+
}
1490+
14581491
@Test
14591492
public void testInvalidDefaultRecordBatch() {
14601493
buildRequestManager();

0 commit comments

Comments
 (0)