Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions tools/src/main/java/org/apache/kafka/tools/LogCompactionTester.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand All @@ -61,6 +61,8 @@
import joptsimple.OptionSet;
import joptsimple.OptionSpec;

import static java.util.stream.Collectors.toCollection;


/**
* This is a torture test that runs against an existing broker
Expand Down Expand Up @@ -246,9 +248,9 @@ public static void main(String[] args) throws Exception {
int sleepSecs = optionSet.valueOf(options.sleepSecsOpt);

long testId = RANDOM.nextLong();
String[] topics = IntStream.range(0, topicCount)
Set<String> topics = IntStream.range(0, topicCount)
.mapToObj(i -> "log-cleaner-test-" + testId + "-" + i)
.toArray(String[]::new);
.collect(toCollection(LinkedHashSet::new));
Comment on lines +251 to +253
Copy link
Collaborator

@m1a2st m1a2st Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don’t we use a List? I think a List is more convenient than a Set when we need to retrieve elements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st thanks for the review! I totally agree with List would be more convenient for indexed access, but Set is preferred here because

  1. semantic clarity: topics represents a collection of unique topic names
  2. duplicate prevention: Set protects against duplicate topics by accident

createTopics(brokerUrl, topics);

System.out.println("Producing " + messages + " messages..to topics " + String.join(",", topics));
Expand Down Expand Up @@ -278,15 +280,15 @@ public static void main(String[] args) throws Exception {
}


private static void createTopics(String brokerUrl, String[] topics) throws Exception {
private static void createTopics(String brokerUrl, Set<String> topics) throws Exception {
Properties adminConfig = new Properties();
adminConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);

try (Admin adminClient = Admin.create(adminConfig)) {
Map<String, String> topicConfigs = Map.of(
TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT
);
List<NewTopic> newTopics = Arrays.stream(topics)
List<NewTopic> newTopics = topics.stream()
.map(name -> new NewTopic(name, 1, (short) 1).configs(topicConfigs)).toList();
adminClient.createTopics(newTopics).all().get();

Expand All @@ -296,7 +298,7 @@ private static void createTopics(String brokerUrl, String[] topics) throws Excep
Set<String> allTopics = adminClient.listTopics().names().get();
pendingTopics.clear();
pendingTopics.addAll(
Arrays.stream(topics)
topics.stream()
.filter(topicName -> !allTopics.contains(topicName))
.toList()
);
Expand Down Expand Up @@ -392,7 +394,7 @@ private static void require(boolean requirement, String message) {
}
}

private static Path produceMessages(String brokerUrl, String[] topics, long messages,
private static Path produceMessages(String brokerUrl, Set<String> topics, long messages,
String compressionType, int dups, int percentDeletes) throws IOException {
Map<String, Object> producerProps = Map.of(
ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE),
Expand All @@ -408,8 +410,10 @@ producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {

try (BufferedWriter producedWriter = Files.newBufferedWriter(
producedFilePath, StandardCharsets.UTF_8)) {
for (long i = 0; i < messages * topics.length; i++) {
String topic = topics[(int) (i % topics.length)];
List<String> topicsList = List.copyOf(topics);
int size = topicsList.size();
for (long i = 0; i < messages * size; i++) {
String topic = topicsList.get((int) (i % size));
int key = RANDOM.nextInt(keyCount);
boolean delete = (i % 100) < percentDeletes;
ProducerRecord<byte[], byte[]> record;
Expand All @@ -430,14 +434,14 @@ record = new ProducerRecord<>(topic,
}
}

private static Path consumeMessages(String brokerUrl, String[] topics) throws IOException {
private static Path consumeMessages(String brokerUrl, Set<String> topics) throws IOException {

Path consumedFilePath = Files.createTempFile("kafka-log-cleaner-consumed-", ".txt");
System.out.println("Logging consumed messages to " + consumedFilePath);

try (Consumer<String, String> consumer = createConsumer(brokerUrl);
BufferedWriter consumedWriter = Files.newBufferedWriter(consumedFilePath, StandardCharsets.UTF_8)) {
consumer.subscribe(Arrays.asList(topics));
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(20));
if (consumerRecords.isEmpty()) return consumedFilePath;
Expand Down