26
26
import io .kafbat .ui .model .TopicCreationDTO ;
27
27
import io .kafbat .ui .model .TopicUpdateDTO ;
28
28
import java .time .Duration ;
29
+ import java .util .ArrayList ;
29
30
import java .util .Collection ;
30
31
import java .util .Collections ;
31
32
import java .util .Comparator ;
@@ -288,6 +289,18 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
288
289
Map <Integer , Integer > brokersUsage = getBrokersMap (cluster , currentAssignment );
289
290
int currentReplicationFactor = topic .getReplicationFactor ();
290
291
292
+ // Get online nodes
293
+ List <Integer > onlineNodes = statisticsCache .get (cluster ).getClusterDescription ().getNodes ()
294
+ .stream ().map (Node ::id ).toList ();
295
+
296
+ // keep only online nodes
297
+ for (Map .Entry <Integer , List <Integer >> parition : currentAssignment .entrySet ()) {
298
+ parition .getValue ().retainAll (onlineNodes );
299
+ }
300
+
301
+ brokersUsage .keySet ().retainAll (onlineNodes );
302
+
303
+
291
304
// If we should to increase Replication factor
292
305
if (replicationFactorChange .getTotalReplicationFactor () > currentReplicationFactor ) {
293
306
// For each partition
@@ -320,6 +333,11 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
320
333
var partition = assignmentEntry .getKey ();
321
334
var brokers = assignmentEntry .getValue ();
322
335
336
+ // Copy frpm online nodes if all nodes are offline
337
+ if (brokers .isEmpty ()) {
338
+ brokers = new ArrayList <>(onlineNodes );
339
+ }
340
+
323
341
// Get brokers list sorted by usage in reverse order
324
342
var brokersUsageList = brokersUsage .entrySet ().stream ()
325
343
.sorted (Map .Entry .comparingByValue (Comparator .reverseOrder ()))
@@ -329,19 +347,20 @@ private Map<TopicPartition, Optional<NewPartitionReassignment>> getPartitionsRea
329
347
// Iterate brokers and try to remove them from assignment
330
348
// while partition replicas count != requested replication factor
331
349
for (Integer broker : brokersUsageList ) {
350
+ if (brokers .size () == replicationFactorChange .getTotalReplicationFactor ()) {
351
+ break ;
352
+ }
332
353
// Check is the broker the leader of partition
333
- if (! topic .getPartitions ().get (partition ).getLeader ()
334
- .equals (broker )) {
354
+ Integer leader = topic .getPartitions ().get (partition ).getLeader ();
355
+ if ( leader != null && ! leader .equals (broker )) {
335
356
brokers .remove (broker );
336
357
brokersUsage .merge (broker , -1 , Integer ::sum );
337
358
}
338
- if (brokers .size () == replicationFactorChange .getTotalReplicationFactor ()) {
339
- break ;
340
- }
341
359
}
342
360
if (brokers .size () != replicationFactorChange .getTotalReplicationFactor ()) {
343
361
throw new ValidationException ("Something went wrong during removing replicas" );
344
362
}
363
+ currentAssignment .put (partition , brokers );
345
364
}
346
365
} else {
347
366
throw new ValidationException ("Replication factor already equals requested" );
@@ -374,7 +393,7 @@ private Map<Integer, Integer> getBrokersMap(KafkaCluster cluster,
374
393
c -> 0
375
394
));
376
395
currentAssignment .values ().forEach (brokers -> brokers
377
- .forEach (broker -> result .put (broker , result .get (broker ) + 1 )));
396
+ .forEach (broker -> result .put (broker , result .getOrDefault (broker , 0 ) + 1 )));
378
397
379
398
return result ;
380
399
}
0 commit comments