43
43
import java .nio .file .Path ;
44
44
import java .time .Duration ;
45
45
import java .util .ArrayList ;
46
- import java .util .Arrays ;
47
46
import java .util .Iterator ;
47
+ import java .util .LinkedHashSet ;
48
48
import java .util .List ;
49
49
import java .util .Map ;
50
50
import java .util .Properties ;
61
61
import joptsimple .OptionSet ;
62
62
import joptsimple .OptionSpec ;
63
63
64
+ import static java .util .stream .Collectors .toCollection ;
65
+
64
66
65
67
/**
66
68
* This is a torture test that runs against an existing broker
@@ -246,9 +248,9 @@ public static void main(String[] args) throws Exception {
246
248
int sleepSecs = optionSet .valueOf (options .sleepSecsOpt );
247
249
248
250
long testId = RANDOM .nextLong ();
249
- String [] topics = IntStream .range (0 , topicCount )
251
+ Set < String > topics = IntStream .range (0 , topicCount )
250
252
.mapToObj (i -> "log-cleaner-test-" + testId + "-" + i )
251
- .toArray ( String [] ::new );
253
+ .collect ( toCollection ( LinkedHashSet ::new ) );
252
254
createTopics (brokerUrl , topics );
253
255
254
256
System .out .println ("Producing " + messages + " messages..to topics " + String .join ("," , topics ));
@@ -278,15 +280,15 @@ public static void main(String[] args) throws Exception {
278
280
}
279
281
280
282
281
- private static void createTopics (String brokerUrl , String [] topics ) throws Exception {
283
+ private static void createTopics (String brokerUrl , Set < String > topics ) throws Exception {
282
284
Properties adminConfig = new Properties ();
283
285
adminConfig .put (CommonClientConfigs .BOOTSTRAP_SERVERS_CONFIG , brokerUrl );
284
286
285
287
try (Admin adminClient = Admin .create (adminConfig )) {
286
288
Map <String , String > topicConfigs = Map .of (
287
289
TopicConfig .CLEANUP_POLICY_CONFIG , TopicConfig .CLEANUP_POLICY_COMPACT
288
290
);
289
- List <NewTopic > newTopics = Arrays .stream (topics )
291
+ List <NewTopic > newTopics = topics .stream ()
290
292
.map (name -> new NewTopic (name , 1 , (short ) 1 ).configs (topicConfigs )).toList ();
291
293
adminClient .createTopics (newTopics ).all ().get ();
292
294
@@ -296,7 +298,7 @@ private static void createTopics(String brokerUrl, String[] topics) throws Excep
296
298
Set <String > allTopics = adminClient .listTopics ().names ().get ();
297
299
pendingTopics .clear ();
298
300
pendingTopics .addAll (
299
- Arrays .stream (topics )
301
+ topics .stream ()
300
302
.filter (topicName -> !allTopics .contains (topicName ))
301
303
.toList ()
302
304
);
@@ -392,7 +394,7 @@ private static void require(boolean requirement, String message) {
392
394
}
393
395
}
394
396
395
- private static Path produceMessages (String brokerUrl , String [] topics , long messages ,
397
+ private static Path produceMessages (String brokerUrl , Set < String > topics , long messages ,
396
398
String compressionType , int dups , int percentDeletes ) throws IOException {
397
399
Map <String , Object > producerProps = Map .of (
398
400
ProducerConfig .MAX_BLOCK_MS_CONFIG , String .valueOf (Long .MAX_VALUE ),
@@ -408,8 +410,10 @@ producerProps, new ByteArraySerializer(), new ByteArraySerializer())) {
408
410
409
411
try (BufferedWriter producedWriter = Files .newBufferedWriter (
410
412
producedFilePath , StandardCharsets .UTF_8 )) {
411
- for (long i = 0 ; i < messages * topics .length ; i ++) {
412
- String topic = topics [(int ) (i % topics .length )];
413
+ List <String > topicsList = List .copyOf (topics );
414
+ int size = topicsList .size ();
415
+ for (long i = 0 ; i < messages * size ; i ++) {
416
+ String topic = topicsList .get ((int ) (i % size ));
413
417
int key = RANDOM .nextInt (keyCount );
414
418
boolean delete = (i % 100 ) < percentDeletes ;
415
419
ProducerRecord <byte [], byte []> record ;
@@ -430,14 +434,14 @@ record = new ProducerRecord<>(topic,
430
434
}
431
435
}
432
436
433
- private static Path consumeMessages (String brokerUrl , String [] topics ) throws IOException {
437
+ private static Path consumeMessages (String brokerUrl , Set < String > topics ) throws IOException {
434
438
435
439
Path consumedFilePath = Files .createTempFile ("kafka-log-cleaner-consumed-" , ".txt" );
436
440
System .out .println ("Logging consumed messages to " + consumedFilePath );
437
441
438
442
try (Consumer <String , String > consumer = createConsumer (brokerUrl );
439
443
BufferedWriter consumedWriter = Files .newBufferedWriter (consumedFilePath , StandardCharsets .UTF_8 )) {
440
- consumer .subscribe (Arrays . asList ( topics ) );
444
+ consumer .subscribe (topics );
441
445
while (true ) {
442
446
ConsumerRecords <String , String > consumerRecords = consumer .poll (Duration .ofSeconds (20 ));
443
447
if (consumerRecords .isEmpty ()) return consumedFilePath ;
0 commit comments