Skip to content

Commit 7dba91d

Browse files
authored
KAFKA-19484: Fix bug with tiered storage throttle metrics (#20129)
Fixes a bug with tiered storage quota metrics introduced in [KIP-956](https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas). The metrics tracking how much time have been spent in a throttled state can stop reporting if a cluster stops stops doing remote copy/fetch and the sensors go inactive. This change delegates the job of refreshing inactive sensors to SensorAccess. There's pretty similar logic in RLMQuotaManager which is actually responsible for tracking and enforcing quotas and also uses a Sensor object. ``` remote-fetch-throttle-time-avg remote-copy-throttle-time-avg remote-fetch-throttle-time-max remote-copy-throttle-time-max ``` Reviewers: Kamal Chandraprakash <kamal.chandraprakash@gmail.com>
1 parent 4ff851a commit 7dba91d

File tree

4 files changed

+86
-17
lines changed

4 files changed

+86
-17
lines changed

storage/src/main/java/org/apache/kafka/server/log/remote/quota/RLMQuotaMetrics.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,29 @@
2626

2727
public class RLMQuotaMetrics {
2828

29-
private final Sensor sensor;
29+
private final SensorAccess sensorAccess;
30+
private final Metrics metrics;
31+
private final String name;
32+
private final String descriptionFormat;
33+
private final String group;
34+
private final long expirationTime;
3035

3136
public RLMQuotaMetrics(Metrics metrics, String name, String group, String descriptionFormat, long expirationTime) {
3237
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
33-
SensorAccess sensorAccess = new SensorAccess(lock, metrics);
34-
this.sensor = sensorAccess.getOrCreate(name, expirationTime, s -> {
35-
s.add(metrics.metricName(name + "-avg", group, String.format(descriptionFormat, "average")), new Avg());
36-
s.add(metrics.metricName(name + "-max", group, String.format(descriptionFormat, "maximum")), new Max());
37-
});
38+
this.sensorAccess = new SensorAccess(lock, metrics);
39+
this.metrics = metrics;
40+
this.name = name;
41+
this.group = group;
42+
this.expirationTime = expirationTime;
43+
this.descriptionFormat = descriptionFormat;
3844
}
3945

4046
public Sensor sensor() {
41-
return sensor;
47+
return sensorAccess.getOrCreate(name, expirationTime, s -> {
48+
s.add(metrics.metricName(name + "-avg", group,
49+
String.format(descriptionFormat, "average")), new Avg());
50+
s.add(metrics.metricName(name + "-max", group,
51+
String.format(descriptionFormat, "maximum")), new Max());
52+
});
4253
}
4354
}

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,8 @@ public class RemoteLogManager implements Closeable, AsyncOffsetReader {
165165
private final Condition copyQuotaManagerLockCondition = copyQuotaManagerLock.newCondition();
166166
private final RLMQuotaManager rlmCopyQuotaManager;
167167
private final RLMQuotaManager rlmFetchQuotaManager;
168-
private final Sensor fetchThrottleTimeSensor;
169-
private final Sensor copyThrottleTimeSensor;
168+
private final RLMQuotaMetrics fetchQuotaMetrics;
169+
private final RLMQuotaMetrics copyQuotaMetrics;
170170

171171
private final RemoteIndexCache indexCache;
172172
private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
@@ -235,10 +235,10 @@ public RemoteLogManager(RemoteLogManagerConfig rlmConfig,
235235
rlmCopyQuotaManager = createRLMCopyQuotaManager();
236236
rlmFetchQuotaManager = createRLMFetchQuotaManager();
237237

238-
fetchThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-fetch-throttle-time", RemoteLogManager.class.getSimpleName(),
239-
"The %s time in millis remote fetches was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor();
240-
copyThrottleTimeSensor = new RLMQuotaMetrics(metrics, "remote-copy-throttle-time", RemoteLogManager.class.getSimpleName(),
241-
"The %s time in millis remote copies was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS).sensor();
238+
fetchQuotaMetrics = new RLMQuotaMetrics(metrics, "remote-fetch-throttle-time", RemoteLogManager.class.getSimpleName(),
239+
"The %s time in millis remote fetches was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS);
240+
copyQuotaMetrics = new RLMQuotaMetrics(metrics, "remote-copy-throttle-time", RemoteLogManager.class.getSimpleName(),
241+
"The %s time in millis remote copies was throttled by a broker", INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS);
242242

243243
indexCache = new RemoteIndexCache(rlmConfig.remoteLogIndexFileCacheTotalSizeBytes(), remoteStorageManagerPlugin.get(), logDir);
244244
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
@@ -347,7 +347,7 @@ public long getFetchThrottleTimeMs() {
347347
}
348348

349349
public Sensor fetchThrottleTimeSensor() {
350-
return fetchThrottleTimeSensor;
350+
return fetchQuotaMetrics.sensor();
351351
}
352352

353353
static RLMQuotaManagerConfig copyQuotaManagerConfig(RemoteLogManagerConfig rlmConfig) {
@@ -961,7 +961,7 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
961961
try {
962962
long throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs();
963963
while (throttleTimeMs > 0) {
964-
copyThrottleTimeSensor.record(throttleTimeMs, time.milliseconds());
964+
copyQuotaMetrics.sensor().record(throttleTimeMs, time.milliseconds());
965965
logger.debug("Quota exceeded for copying log segments, waiting for the quota to be available.");
966966
// If the thread gets interrupted while waiting, the InterruptedException is thrown
967967
// back to the caller. It's important to note that the task being executed is already
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.server.log.remote.quota;
19+
20+
import org.apache.kafka.common.metrics.MetricConfig;
21+
import org.apache.kafka.common.metrics.Metrics;
22+
import org.apache.kafka.common.metrics.Sensor;
23+
import org.apache.kafka.common.utils.MockTime;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.util.List;
28+
29+
import static org.junit.jupiter.api.Assertions.assertEquals;
30+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
31+
32+
public class RLMQuotaMetricsTest {
33+
private final MockTime time = new MockTime();
34+
private final Metrics metrics = new Metrics(new MetricConfig(), List.of(), time);
35+
36+
@Test
37+
public void testNewSensorWhenExpired() {
38+
RLMQuotaMetrics rlmQuotaMetrics = new RLMQuotaMetrics(metrics, "metric", "group", "format", 5);
39+
Sensor sensor = rlmQuotaMetrics.sensor();
40+
Sensor sensorRepeat = rlmQuotaMetrics.sensor();
41+
42+
// If the sensor has not expired we should reuse it.
43+
assertEquals(sensorRepeat, sensor);
44+
45+
// The ExpireSensorTask calls removeSensor to remove expired sensors.
46+
metrics.removeSensor(sensor.name());
47+
48+
// If the sensor has been removed, we should get a new one.
49+
Sensor newSensor = rlmQuotaMetrics.sensor();
50+
assertNotEquals(sensor, newSensor);
51+
}
52+
}

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3425,8 +3425,14 @@ public void testCopyQuota(boolean quotaExceeded) throws Exception {
34253425
Map<org.apache.kafka.common.MetricName, KafkaMetric> allMetrics = metrics.metrics();
34263426
KafkaMetric avgMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-avg", "RemoteLogManager"));
34273427
KafkaMetric maxMetric = allMetrics.get(metrics.metricName("remote-copy-throttle-time-max", "RemoteLogManager"));
3428-
assertEquals(Double.NaN, avgMetric.metricValue());
3429-
assertEquals(Double.NaN, maxMetric.metricValue());
3428+
if (quotaExceeded) {
3429+
assertEquals(Double.NaN, avgMetric.metricValue());
3430+
assertEquals(Double.NaN, maxMetric.metricValue());
3431+
} else {
3432+
// Metrics are not created until they actually get recorded (e.g. if the quota is exceeded).
3433+
assertNull(avgMetric);
3434+
assertNull(maxMetric);
3435+
}
34303436

34313437
// Verify the highest offset in remote storage is updated
34323438
ArgumentCaptor<Long> capture = ArgumentCaptor.forClass(Long.class);

0 commit comments

Comments
 (0)