From fea81de6eae674432d35f719ce90b83c634424f8 Mon Sep 17 00:00:00 2001 From: Yunchi Pang Date: Mon, 18 Aug 2025 16:50:02 -0700 Subject: [PATCH 1/4] change topics type --- .../kafka/tools/LogCompactionTester.java | 24 ++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java index accf5241a3538..252b65637d9b2 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java +++ b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java @@ -43,7 +43,6 @@ import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -57,6 +56,8 @@ import java.util.stream.IntStream; import java.util.stream.Stream; +import static java.util.stream.Collectors.toSet; + import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; @@ -246,9 +247,9 @@ public static void main(String[] args) throws Exception { int sleepSecs = optionSet.valueOf(options.sleepSecsOpt); long testId = RANDOM.nextLong(); - String[] topics = IntStream.range(0, topicCount) + Set topics = IntStream.range(0, topicCount) .mapToObj(i -> "log-cleaner-test-" + testId + "-" + i) - .toArray(String[]::new); + .collect(toSet()); createTopics(brokerUrl, topics); System.out.println("Producing " + messages + " messages..to topics " + String.join(",", topics)); @@ -278,7 +279,7 @@ public static void main(String[] args) throws Exception { } - private static void createTopics(String brokerUrl, String[] topics) throws Exception { + private static void createTopics(String brokerUrl, Set topics) throws Exception { Properties adminConfig = new Properties(); adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); @@ -286,7 +287,7 @@ private static void createTopics(String brokerUrl, String[] topics) throws Excep Map topicConfigs = Map.of( TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT ); - List newTopics = Arrays.stream(topics) + List newTopics = topics.stream() .map(name -> new NewTopic(name, 1, (short) 1).configs(topicConfigs)).toList(); adminClient.createTopics(newTopics).all().get(); @@ -296,7 +297,7 @@ private static void createTopics(String brokerUrl, String[] topics) throws Excep Set allTopics = adminClient.listTopics().names().get(); pendingTopics.clear(); pendingTopics.addAll( - Arrays.stream(topics) + topics.stream() .filter(topicName -> !allTopics.contains(topicName)) .toList() ); @@ -392,7 +393,7 @@ private static void require(boolean requirement, String message) { } } - private static Path produceMessages(String brokerUrl, String[] topics, long messages, + private static Path produceMessages(String brokerUrl, Set topics, long messages, String compressionType, int dups, int percentDeletes) throws IOException { Map producerProps = Map.of( ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE), @@ -408,8 +409,9 @@ producerProps, new ByteArraySerializer(), new ByteArraySerializer())) { try (BufferedWriter producedWriter = Files.newBufferedWriter( producedFilePath, StandardCharsets.UTF_8)) { - for (long i = 0; i < messages * topics.length; i++) { - String topic = topics[(int) (i % topics.length)]; + String[] topicsArray = topics.toArray(new String[0]); + for (long i = 0; i < messages * topics.size(); i++) { + String topic = topicsArray[(int) (i % topics.size())]; int key = RANDOM.nextInt(keyCount); boolean delete = (i % 100) < percentDeletes; ProducerRecord record; @@ -430,14 +432,14 @@ record = new ProducerRecord<>(topic, } } - private static Path consumeMessages(String brokerUrl, String[] topics) throws IOException { + private static Path consumeMessages(String brokerUrl, Set topics) throws IOException { Path consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", ".txt"); System.out.println("Logging consumed messages to " + consumedFilePath); try (Consumer consumer = createConsumer(brokerUrl); BufferedWriter consumedWriter = Files.newBufferedWriter(consumedFilePath, StandardCharsets.UTF_8)) { - consumer.subscribe(Arrays.asList(topics)); + consumer.subscribe(new ArrayList<>(topics)); while (true) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(20)); if (consumerRecords.isEmpty()) return consumedFilePath; From 331515b0a9ac06b015b07a49364f0cad669e3853 Mon Sep 17 00:00:00 2001 From: Yunchi Pang Date: Mon, 18 Aug 2025 17:05:03 -0700 Subject: [PATCH 2/4] run formatting --- .../main/java/org/apache/kafka/tools/LogCompactionTester.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java index 252b65637d9b2..9fcd18d3c2c47 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java +++ b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java @@ -56,12 +56,12 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import static java.util.stream.Collectors.toSet; - import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; +import static java.util.stream.Collectors.toSet; + /** * This is a torture test that runs against an existing broker From 8bd5f6b46eefddc7ae1106612267de5cdcc1d927 Mon Sep 17 00:00:00 2001 From: Yunchi Pang Date: Mon, 18 Aug 2025 17:20:16 -0700 Subject: [PATCH 3/4] use LinkedHashSet --- .../java/org/apache/kafka/tools/LogCompactionTester.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java index 9fcd18d3c2c47..d9a1d6b2ac565 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java +++ b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java @@ -44,6 +44,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Properties; @@ -60,7 +61,7 @@ import joptsimple.OptionSet; import joptsimple.OptionSpec; -import static java.util.stream.Collectors.toSet; +import static java.util.stream.Collectors.toCollection; /** @@ -249,7 +250,7 @@ public static void main(String[] args) throws Exception { long testId = RANDOM.nextLong(); Set topics = IntStream.range(0, topicCount) .mapToObj(i -> "log-cleaner-test-" + testId + "-" + i) - .collect(toSet()); + .collect(toCollection(LinkedHashSet::new)); createTopics(brokerUrl, topics); System.out.println("Producing " + messages + " messages..to topics " + String.join(",", topics)); From e4a3c6befd5fd79de0d451e3551a1c39159c1b91 Mon Sep 17 00:00:00 2001 From: Yunchi Pang Date: Thu, 21 Aug 2025 15:20:04 -0700 Subject: [PATCH 4/4] avoid using array, stick with List --- .../java/org/apache/kafka/tools/LogCompactionTester.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java index d9a1d6b2ac565..497b8b92b272e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java +++ b/tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java @@ -410,9 +410,10 @@ producerProps, new ByteArraySerializer(), new ByteArraySerializer())) { try (BufferedWriter producedWriter = Files.newBufferedWriter( producedFilePath, StandardCharsets.UTF_8)) { - String[] topicsArray = topics.toArray(new String[0]); - for (long i = 0; i < messages * topics.size(); i++) { - String topic = topicsArray[(int) (i % topics.size())]; + List topicsList = List.copyOf(topics); + int size = topicsList.size(); + for (long i = 0; i < messages * size; i++) { + String topic = topicsList.get((int) (i % size)); int key = RANDOM.nextInt(keyCount); boolean delete = (i % 100) < percentDeletes; ProducerRecord record; @@ -440,7 +441,7 @@ private static Path consumeMessages(String brokerUrl, Set topics) throws try (Consumer consumer = createConsumer(brokerUrl); BufferedWriter consumedWriter = Files.newBufferedWriter(consumedFilePath, StandardCharsets.UTF_8)) { - consumer.subscribe(new ArrayList<>(topics)); + consumer.subscribe(topics); while (true) { ConsumerRecords consumerRecords = consumer.poll(Duration.ofSeconds(20)); if (consumerRecords.isEmpty()) return consumedFilePath;