Skip to content

Commit 40b4fdb

Browse files
authored
KAFKA-19559: Remove ShareFetchMetricsManager sensors on consumer.close() (#20255)
*What* https://issues.apache.org/jira/browse/KAFKA-19559 - There are a few sensors(created when a `ShareConsumer` initializes) which are not removed when the `ShareConsumer` closes. - To maintain consistency and prevent any leaks, we have to remove all the sensors when the consumer is closed. This is similar to this issue for regular consumers - https://issues.apache.org/jira/browse/KAFKA-19542 Reviewers: Andrew Schofield <aschofield@confluent.io>, Apoorv Mittal <apoorvmittal10@gmail.com>
1 parent 875537f commit 40b4fdb

File tree

4 files changed

+87
-1
lines changed

4 files changed

+87
-1
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,7 @@ boolean hasCompletedFetches() {
10761076

10771077
protected void closeInternal() {
10781078
Utils.closeQuietly(shareFetchBuffer, "shareFetchBuffer");
1079+
Utils.closeQuietly(metricsManager, "shareFetchMetricsManager");
10791080
}
10801081

10811082
public void close() {

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManager.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import org.apache.kafka.common.metrics.Sensor;
2121
import org.apache.kafka.common.metrics.stats.WindowedCount;
2222

23-
public class ShareFetchMetricsManager {
23+
import java.io.IOException;
24+
import java.util.Arrays;
25+
26+
public class ShareFetchMetricsManager implements AutoCloseable {
2427
private final Metrics metrics;
2528
private final Sensor throttleTime;
2629
private final Sensor bytesFetched;
@@ -92,4 +95,16 @@ void recordAcknowledgementSent(int acknowledgements) {
9295
void recordFailedAcknowledgements(int acknowledgements) {
9396
failedAcknowledgements.record(acknowledgements);
9497
}
98+
99+
@Override
100+
public void close() throws IOException {
101+
Arrays.asList(
102+
throttleTime.name(),
103+
bytesFetched.name(),
104+
recordsFetched.name(),
105+
fetchLatency.name(),
106+
sentAcknowledgements.name(),
107+
failedAcknowledgements.name()
108+
).forEach(metrics::removeSensor);
109+
}
95110
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import static org.junit.jupiter.api.Assertions.assertFalse;
118118
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
119119
import static org.junit.jupiter.api.Assertions.assertNotEquals;
120+
import static org.junit.jupiter.api.Assertions.assertNotNull;
120121
import static org.junit.jupiter.api.Assertions.assertNull;
121122
import static org.junit.jupiter.api.Assertions.assertThrows;
122123
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -2427,6 +2428,36 @@ void testWhenLeadershipChangedAfterDisconnected() {
24272428
assertEquals(1, fetchedRecords.size());
24282429
}
24292430

2431+
@Test
2432+
public void testCloseInternalClosesShareFetchMetricsManager() throws Exception {
2433+
buildRequestManager();
2434+
2435+
// Define all sensor names that should be created and removed
2436+
String[] sensorNames = {
2437+
"fetch-throttle-time",
2438+
"bytes-fetched",
2439+
"records-fetched",
2440+
"fetch-latency",
2441+
"sent-acknowledgements",
2442+
"failed-acknowledgements"
2443+
};
2444+
2445+
// Verify that sensors exist before closing
2446+
for (String sensorName : sensorNames) {
2447+
assertNotNull(metrics.getSensor(sensorName),
2448+
"Sensor " + sensorName + " should exist before closing");
2449+
}
2450+
2451+
// Close the request manager
2452+
shareConsumeRequestManager.close();
2453+
2454+
// Verify that all sensors are removed after closing
2455+
for (String sensorName : sensorNames) {
2456+
assertNull(metrics.getSensor(sensorName),
2457+
"Sensor " + sensorName + " should be removed after closing");
2458+
}
2459+
}
2460+
24302461
private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
24312462
Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions = Map.of(tp,
24322463
new ShareFetchResponseData.PartitionData()

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchMetricsManagerTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@
3030
import org.junit.jupiter.api.BeforeEach;
3131
import org.junit.jupiter.api.Test;
3232

33+
import java.io.IOException;
34+
3335
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
3436
import static org.junit.jupiter.api.Assertions.assertEquals;
37+
import static org.junit.jupiter.api.Assertions.assertNotNull;
38+
import static org.junit.jupiter.api.Assertions.assertNull;
3539

3640
class ShareFetchMetricsManagerTest {
3741
private static final double EPSILON = 0.0001;
@@ -114,6 +118,41 @@ public void testRecordsFetched() {
114118
assertEquals(8, (double) getMetric(shareFetchMetricsRegistry.recordsPerRequestAvg).metricValue(), EPSILON);
115119
}
116120

121+
@Test
122+
public void testAcknowledgements() {
123+
shareFetchMetricsManager.recordAcknowledgementSent(5);
124+
shareFetchMetricsManager.recordFailedAcknowledgements(2);
125+
126+
assertEquals(5, (double) getMetric(shareFetchMetricsRegistry.acknowledgementSendTotal).metricValue());
127+
assertEquals(2, (double) getMetric(shareFetchMetricsRegistry.acknowledgementErrorTotal).metricValue());
128+
}
129+
130+
@Test
131+
public void testCloseRemovesAllSensors() throws IOException {
132+
// Define all sensor names that should be created and removed
133+
String[] sensorNames = {
134+
"fetch-throttle-time",
135+
"bytes-fetched",
136+
"records-fetched",
137+
"fetch-latency",
138+
"sent-acknowledgements",
139+
"failed-acknowledgements"
140+
};
141+
142+
// Verify that sensors exist before closing
143+
for (String sensorName : sensorNames) {
144+
assertNotNull(metrics.getSensor(sensorName), "Sensor " + sensorName + " should exist before closing");
145+
}
146+
147+
// Close the metrics manager
148+
shareFetchMetricsManager.close();
149+
150+
// Verify that all sensors are removed
151+
for (String sensorName : sensorNames) {
152+
assertNull(metrics.getSensor(sensorName), "Sensor " + sensorName + " should be removed after closing");
153+
}
154+
}
155+
117156
private KafkaMetric getMetric(MetricNameTemplate name) {
118157
return metrics.metric(metrics.metricInstance(name));
119158
}

0 commit comments

Comments
 (0)