Skip to content

Commit fba01c4

Browse files
authored
KAFKA-17645 Enable warmup in producer performance test (KIP-1052) (#17340)
In order to better analyze steady-state performance of Kafka, this PR enables a warmup in the Producer Performance test. The warmup duration is specified as a number of records that are a subset of the total numRecords. If warmup records is greater than 0, the warmup is represented by a second Stats object which holds warmup results. Once warmup records have been exhausted, the test switches to using the existing Stats object. At end of test, if warmup was enabled, the summary of the whole test (warump + steady state) is printed followed by the summary of the steady-state portion of the test. If no warmup is used, summary prints don't change from existing behavior. This contribution is an original work and is licensed to the Kafka project under the Apache license Testing strategy comprises new Java unit tests added to ProducerPerformanceTests.java. Reviewers: Kirk True <kirk@kirktrue.pro>, Federico Valeri <fedevaleri@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent d350f60 commit fba01c4

File tree

2 files changed

+139
-15
lines changed

2 files changed

+139
-15
lines changed

tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,12 @@ void start(String[] args) throws IOException {
7676
// not thread-safe, do not share with other threads
7777
SplittableRandom random = new SplittableRandom(0);
7878
ProducerRecord<byte[], byte[]> record;
79-
stats = new Stats(config.numRecords, 5000);
79+
80+
if (config.warmupRecords > 0) {
81+
System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary.");
82+
}
83+
boolean isSteadyState = false;
84+
stats = new Stats(config.numRecords, isSteadyState);
8085
long startMs = System.currentTimeMillis();
8186

8287
ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs);
@@ -95,7 +100,11 @@ void start(String[] args) throws IOException {
95100
record = new ProducerRecord<>(config.topicName, payload);
96101

97102
long sendStartMs = System.currentTimeMillis();
98-
cb = new PerfCallback(sendStartMs, payload.length, stats);
103+
if ((isSteadyState = config.warmupRecords > 0) && i == config.warmupRecords) {
104+
steadyStateStats = new Stats(config.numRecords - config.warmupRecords, isSteadyState);
105+
stats.suppressPrinting();
106+
}
107+
cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
99108
producer.send(record, cb);
100109

101110
currentTransactionSize++;
@@ -117,6 +126,10 @@ record = new ProducerRecord<>(config.topicName, payload);
117126

118127
/* print final results */
119128
stats.printTotal();
129+
/* print steady-state stats if relevant */
130+
if (steadyStateStats != null) {
131+
steadyStateStats.printTotal();
132+
}
120133
} else {
121134
// Make sure all messages are sent before printing out the stats and the metrics
122135
// We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
@@ -125,6 +138,10 @@ record = new ProducerRecord<>(config.topicName, payload);
125138

126139
/* print final results */
127140
stats.printTotal();
141+
/* print steady-state stats if relevant */
142+
if (steadyStateStats != null) {
143+
steadyStateStats.printTotal();
144+
}
128145

129146
/* print out metrics */
130147
ToolsUtils.printMetrics(producer.metrics());
@@ -147,8 +164,8 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
147164
}
148165

149166
Callback cb;
150-
151167
Stats stats;
168+
Stats steadyStateStats;
152169

153170
static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
154171
SplittableRandom random, boolean payloadMonotonic, long recordValue) {
@@ -164,7 +181,7 @@ static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByte
164181
}
165182
return payload;
166183
}
167-
184+
168185
static Properties readProps(List<String> producerProps, String producerConfig) throws IOException {
169186
Properties props = new Properties();
170187
if (producerConfig != null) {
@@ -331,6 +348,16 @@ static ArgumentParser argParser() {
331348
"--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " +
332349
"the default value will be 3000.");
333350

351+
parser.addArgument("--warmup-records")
352+
.action(store())
353+
.required(false)
354+
.type(Long.class)
355+
.metavar("WARMUP-RECORDS")
356+
.dest("warmupRecords")
357+
.setDefault(0L)
358+
.help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " +
359+
"An additional summary line will be printed describing the steady-state statistics. (default: 0).");
360+
334361
return parser;
335362
}
336363

@@ -351,8 +378,10 @@ static class Stats {
351378
private long windowTotalLatency;
352379
private long windowBytes;
353380
private long windowStart;
381+
private final boolean isSteadyState;
382+
private boolean suppressPrint;
354383

355-
public Stats(long numRecords, int reportingInterval) {
384+
public Stats(long numRecords, boolean isSteadyState) {
356385
this.start = System.currentTimeMillis();
357386
this.windowStart = System.currentTimeMillis();
358387
this.iteration = 0;
@@ -365,7 +394,9 @@ public Stats(long numRecords, int reportingInterval) {
365394
this.windowTotalLatency = 0;
366395
this.windowBytes = 0;
367396
this.totalLatency = 0;
368-
this.reportingInterval = reportingInterval;
397+
this.reportingInterval = 5000;
398+
this.isSteadyState = isSteadyState;
399+
this.suppressPrint = false;
369400
}
370401

371402
public void record(int latency, int bytes, long time) {
@@ -383,9 +414,15 @@ public void record(int latency, int bytes, long time) {
383414
}
384415
/* maybe report the recent perf */
385416
if (time - windowStart >= reportingInterval) {
386-
printWindow();
417+
if (this.isSteadyState && count == windowCount) {
418+
System.out.println("In steady state.");
419+
}
420+
if (!this.suppressPrint) {
421+
printWindow();
422+
}
387423
newWindow();
388424
}
425+
this.iteration++;
389426
}
390427

391428
public long totalCount() {
@@ -433,8 +470,9 @@ public void printTotal() {
433470
double recsPerSec = 1000.0 * count / (double) elapsed;
434471
double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
435472
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
436-
System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
473+
System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
437474
count,
475+
this.isSteadyState ? " steady state" : "",
438476
recsPerSec,
439477
mbPerSec,
440478
totalLatency / (double) count,
@@ -455,16 +493,22 @@ private static int[] percentiles(int[] latencies, int count, double... percentil
455493
}
456494
return values;
457495
}
496+
497+
public void suppressPrinting() {
498+
this.suppressPrint = true;
499+
}
458500
}
459501

460502
static final class PerfCallback implements Callback {
461503
private final long start;
462504
private final int bytes;
463505
private final Stats stats;
506+
private final Stats steadyStateStats;
464507

465-
public PerfCallback(long start, int bytes, Stats stats) {
508+
public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) {
466509
this.start = start;
467510
this.stats = stats;
511+
this.steadyStateStats = steadyStateStats;
468512
this.bytes = bytes;
469513
}
470514

@@ -475,7 +519,9 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
475519
// magically printed when the sending fails.
476520
if (exception == null) {
477521
this.stats.record(latency, bytes, now);
478-
this.stats.iteration++;
522+
if (steadyStateStats != null) {
523+
this.steadyStateStats.record(latency, bytes, now);
524+
}
479525
}
480526
if (exception != null)
481527
exception.printStackTrace();
@@ -484,7 +530,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
484530

485531
static final class ConfigPostProcessor {
486532
final String topicName;
487-
final Long numRecords;
533+
final long numRecords;
534+
final long warmupRecords;
488535
final Integer recordSize;
489536
final double throughput;
490537
final boolean payloadMonotonic;
@@ -498,6 +545,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
498545
Namespace namespace = parser.parseArgs(args);
499546
this.topicName = namespace.getString("topic");
500547
this.numRecords = namespace.getLong("numRecords");
548+
this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0);
501549
this.recordSize = namespace.getInt("recordSize");
502550
this.throughput = namespace.getDouble("throughput");
503551
this.payloadMonotonic = namespace.getBoolean("payloadMonotonic");
@@ -508,9 +556,12 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
508556
String payloadFilePath = namespace.getString("payloadFile");
509557
Long transactionDurationMsArg = namespace.getLong("transactionDurationMs");
510558
String transactionIdArg = namespace.getString("transactionalId");
511-
if (numRecords != null && numRecords <= 0) {
559+
if (numRecords <= 0) {
512560
throw new ArgumentParserException("--num-records should be greater than zero", parser);
513561
}
562+
if (warmupRecords >= numRecords) {
563+
throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
564+
}
514565
if (recordSize != null && recordSize <= 0) {
515566
throw new ArgumentParserException("--record-size should be greater than zero", parser);
516567
}

tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -381,16 +381,16 @@ public void testDefaultClientId() throws Exception {
381381
@Test
382382
public void testStatsInitializationWithLargeNumRecords() {
383383
long numRecords = Long.MAX_VALUE;
384-
assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000));
384+
assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, false));
385385
}
386386

387387
@Test
388388
public void testStatsCorrectness() throws Exception {
389389
ExecutorService singleThreaded = Executors.newSingleThreadExecutor();
390390
final long numRecords = 1000000;
391-
ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, 5000);
391+
ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, false);
392392
for (long i = 0; i < numRecords; i++) {
393-
final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats);
393+
final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats, null);
394394
CompletableFuture.runAsync(() -> {
395395
callback.onCompletion(null, null);
396396
}, singleThreaded);
@@ -567,4 +567,77 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A
567567
assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString()
568568
.startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX));
569569
}
570+
571+
@Test
572+
public void testWarmupRecordsFractionalValue() throws Exception {
573+
String[] args = new String[] {
574+
"--topic", "Hello-Kafka",
575+
"--num-records", "10",
576+
"--warmup-records", "1.5",
577+
"--throughput", "100",
578+
"--record-size", "100",
579+
"--producer-props", "bootstrap.servers=localhost:9000"};
580+
ArgumentParser parser = ProducerPerformance.argParser();
581+
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
582+
thrown.printStackTrace();
583+
}
584+
585+
@Test
586+
public void testWarmupRecordsString() throws Exception {
587+
String[] args = new String[] {
588+
"--topic", "Hello-Kafka",
589+
"--num-records", "10",
590+
"--warmup-records", "foo",
591+
"--throughput", "100",
592+
"--record-size", "100",
593+
"--producer-props", "bootstrap.servers=localhost:9000"};
594+
ArgumentParser parser = ProducerPerformance.argParser();
595+
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
596+
thrown.printStackTrace();
597+
}
598+
599+
@Test
600+
public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException {
601+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
602+
doAnswer(invocation -> {
603+
producerPerformanceSpy.cb.onCompletion(null, null);
604+
return null;
605+
}).when(producerMock).send(any(), any());
606+
607+
String[] args = new String[] {
608+
"--topic", "Hello-Kafka",
609+
"--num-records", "10",
610+
"--warmup-records", "2",
611+
"--throughput", "1",
612+
"--record-size", "100",
613+
"--producer-props", "bootstrap.servers=localhost:9000"};
614+
producerPerformanceSpy.start(args);
615+
616+
verify(producerMock, times(10)).send(any(), any());
617+
assertEquals(10, producerPerformanceSpy.stats.totalCount());
618+
assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount());
619+
verify(producerMock, times(1)).close();
620+
}
621+
622+
@Test
623+
public void testWarmupNegativeRecordsNormalTest() throws IOException {
624+
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
625+
doAnswer(invocation -> {
626+
producerPerformanceSpy.cb.onCompletion(null, null);
627+
return null;
628+
}).when(producerMock).send(any(), any());
629+
630+
String[] args = new String[] {
631+
"--topic", "Hello-Kafka",
632+
"--num-records", "10",
633+
"--warmup-records", "-1",
634+
"--throughput", "1",
635+
"--record-size", "100",
636+
"--producer-props", "bootstrap.servers=localhost:9000"};
637+
producerPerformanceSpy.start(args);
638+
639+
verify(producerMock, times(10)).send(any(), any());
640+
assertEquals(10, producerPerformanceSpy.stats.totalCount());
641+
verify(producerMock, times(1)).close();
642+
}
570643
}

0 commit comments

Comments
 (0)