diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 0cdae7891..04dd2d593 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -26,6 +26,7 @@ import io.kafbat.ui.model.TopicCreationDTO; import io.kafbat.ui.model.TopicUpdateDTO; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -288,6 +289,18 @@ private Map> getPartitionsRea Map brokersUsage = getBrokersMap(cluster, currentAssignment); int currentReplicationFactor = topic.getReplicationFactor(); + // Get online nodes + List onlineNodes = statisticsCache.get(cluster).getClusterDescription().getNodes() + .stream().map(Node::id).toList(); + + // keep only online nodes + for (Map.Entry> parition : currentAssignment.entrySet()) { + parition.getValue().retainAll(onlineNodes); + } + + brokersUsage.keySet().retainAll(onlineNodes); + + // If we should to increase Replication factor if (replicationFactorChange.getTotalReplicationFactor() > currentReplicationFactor) { // For each partition @@ -320,28 +333,35 @@ private Map> getPartitionsRea var partition = assignmentEntry.getKey(); var brokers = assignmentEntry.getValue(); + // Copy from online nodes if all nodes are offline + if (brokers.isEmpty()) { + brokers = new ArrayList<>(onlineNodes); + } + // Get brokers list sorted by usage in reverse order var brokersUsageList = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) .map(Map.Entry::getKey) .toList(); + Integer leader = topic.getPartitions().get(partition).getLeader(); + // Iterate brokers and try to remove them from assignment // while partition replicas count != requested replication factor for (Integer broker : brokersUsageList) { + if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { + break; + } // Check is the broker the leader of partition - if (!topic.getPartitions().get(partition).getLeader() - .equals(broker)) { + if (leader == null || !leader.equals(broker)) { brokers.remove(broker); brokersUsage.merge(broker, -1, Integer::sum); } - if (brokers.size() == replicationFactorChange.getTotalReplicationFactor()) { - break; - } } if (brokers.size() != replicationFactorChange.getTotalReplicationFactor()) { throw new ValidationException("Something went wrong during removing replicas"); } + currentAssignment.put(partition, brokers); } } else { throw new ValidationException("Replication factor already equals requested"); @@ -374,7 +394,7 @@ private Map getBrokersMap(KafkaCluster cluster, c -> 0 )); currentAssignment.values().forEach(brokers -> brokers - .forEach(broker -> result.put(broker, result.get(broker) + 1))); + .forEach(broker -> result.put(broker, result.getOrDefault(broker, 0) + 1))); return result; }