Skip to content

Commit 47bb46c

Browse files
authored
KAFKA-19582 the current assignments shown by ReassignPartitionsCommand should include the log directories (#20319)
The ReassignPartitionsCommand shows the topic replicas on each broker. When using the --generate command, it returns the current partition replica assignment. However, the log directory for each current replica is always shown as any. This makes it impossible for users to determine which specific log directory is being used by each replica. Therefore, we should fix this behavior. ``` Current partition replica assignment { "version": 1, "partitions": [ { "topic": "test1", "partition": 0, "replicas": [ 4, 2 ], "log_dirs": [ "any", "any" ] } ] } ``` This PR ``` Current partition replica assignment { "version": 1, "partitions": [ { "topic": "test1", "partition": 0, "replicas": [ 4, 2 ], "log_dirs": [ "/tmp/kraft-broker-logs234", "/tmp/kraft-broker-logs" ] } ] } ``` Reviewers: PoAn Yang <payang@apache.org>, Jhen-Yung Hsu <jhenyunghsu@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent c565ba1 commit 47bb46c

File tree

2 files changed

+109
-27
lines changed

2 files changed

+109
-27
lines changed

tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -566,10 +566,11 @@ public static Entry<Map<TopicPartition, List<Integer>>, Map<TopicPartition, List
566566
List<String> topicsToReassign = t0.getValue();
567567

568568
Map<TopicPartition, List<Integer>> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign);
569+
Map<TopicPartitionReplica, String> currentReplicaLogDirs = getReplicaToLogDir(adminClient, currentAssignments);
569570
List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness);
570571
Map<TopicPartition, List<Integer>> proposedAssignments = calculateAssignment(currentAssignments, usableBrokers);
571572
System.out.printf("Current partition replica assignment%n%s%n%n",
572-
formatAsReassignmentJson(currentAssignments, Map.of()));
573+
formatAsReassignmentJson(currentAssignments, currentReplicaLogDirs));
573574
System.out.printf("Proposed partition reassignment configuration%n%s%n",
574575
formatAsReassignmentJson(proposedAssignments, Map.of()));
575576
return Map.entry(proposedAssignments, currentAssignments);
@@ -775,7 +776,7 @@ public static void executeAssignment(Admin adminClient,
775776

776777
verifyBrokerIds(adminClient, brokers);
777778
Map<TopicPartition, List<Integer>> currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet());
778-
System.out.println(currentPartitionReplicaAssignmentToString(proposedParts, currentParts));
779+
System.out.println(currentPartitionReplicaAssignmentToString(adminClient, proposedParts, currentParts));
779780

780781
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
781782
System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);
@@ -916,20 +917,23 @@ private static void verifyBrokerIds(Admin adminClient, Set<Integer> brokers) thr
916917
/**
917918
* Return the string which we want to print to describe the current partition assignment.
918919
*
920+
* @param adminClient The admin client object to use.
919921
* @param proposedParts The proposed partition assignment.
920922
* @param currentParts The current partition assignment.
921923
*
922924
* @return The string to print. We will only print information about
923925
* partitions that appear in the proposed partition assignment.
924926
*/
925-
static String currentPartitionReplicaAssignmentToString(Map<TopicPartition, List<Integer>> proposedParts,
926-
Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException {
927+
static String currentPartitionReplicaAssignmentToString(Admin adminClient,
928+
Map<TopicPartition, List<Integer>> proposedParts,
929+
Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException, ExecutionException, InterruptedException {
927930
Map<TopicPartition, List<Integer>> partitionsToBeReassigned = currentParts.entrySet().stream()
928931
.filter(e -> proposedParts.containsKey(e.getKey()))
929932
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
933+
Map<TopicPartitionReplica, String> currentReplicaLogDirs = getReplicaToLogDir(adminClient, partitionsToBeReassigned);
930934

931935
return String.format("Current partition replica assignment%n%n%s%n%nSave this to use as the %s",
932-
formatAsReassignmentJson(partitionsToBeReassigned, Map.of()),
936+
formatAsReassignmentJson(partitionsToBeReassigned, currentReplicaLogDirs),
933937
"--reassignment-json-file option during rollback");
934938
}
935939

@@ -1514,4 +1518,26 @@ static Set<TopicPartitionReplica> alterReplicaLogDirs(Admin adminClient,
15141518
}
15151519
return results;
15161520
}
1521+
1522+
static Map<TopicPartitionReplica, String> getReplicaToLogDir(
1523+
Admin adminClient,
1524+
Map<TopicPartition, List<Integer>> topicPartitionToReplicas
1525+
) throws InterruptedException, ExecutionException {
1526+
var replicaLogDirs = topicPartitionToReplicas
1527+
.entrySet()
1528+
.stream()
1529+
.flatMap(entry -> entry.getValue()
1530+
.stream()
1531+
.map(id -> new TopicPartitionReplica(entry.getKey().topic(), entry.getKey().partition(), id)))
1532+
.collect(Collectors.toUnmodifiableSet());
1533+
1534+
return adminClient.describeReplicaLogDirs(replicaLogDirs).all().get()
1535+
.entrySet()
1536+
.stream()
1537+
.filter(entry -> entry.getValue().getCurrentReplicaLogDir() != null)
1538+
.collect(Collectors.toMap(
1539+
Entry::getKey,
1540+
entry -> entry.getValue().getCurrentReplicaLogDir()
1541+
));
1542+
}
15171543
}

tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@
7171
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata;
7272
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
7373
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
74+
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaToLogDir;
7475
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle;
7576
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyLogDirThrottle;
7677
import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles;
@@ -436,29 +437,50 @@ public void testGenerateAssignmentWithFewerBrokers() throws Exception {
436437

437438
@Test
438439
public void testCurrentPartitionReplicaAssignmentToString() throws Exception {
439-
Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
440-
441-
proposedParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
442-
proposedParts.put(new TopicPartition("bar", 0), List.of(7, 8, 9));
443-
444-
Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
445-
446-
currentParts.put(new TopicPartition("foo", 0), List.of(1, 2, 3));
447-
currentParts.put(new TopicPartition("foo", 1), List.of(4, 5, 6));
448-
currentParts.put(new TopicPartition("bar", 0), List.of(7, 8));
449-
currentParts.put(new TopicPartition("baz", 0), List.of(10, 11, 12));
440+
try (MockAdminClient adminClient = new MockAdminClient.Builder()
441+
.numBrokers(6)
442+
.brokerLogDirs(List.of(
443+
List.of("/tmp/broker0/logs"),
444+
List.of("/tmp/broker1/logs"),
445+
List.of("/tmp/broker2/logs"),
446+
List.of("/tmp/broker3/logs"),
447+
List.of("/tmp/broker4/logs"),
448+
List.of("/tmp/broker5/logs")
449+
))
450+
.build()
451+
) {
452+
453+
List<Node> brokers = adminClient.brokers();
454+
adminClient.addTopic(false, "foo", List.of(
455+
new TopicPartitionInfo(1, brokers.get(1),
456+
List.of(brokers.get(1), brokers.get(2), brokers.get(3)),
457+
List.of(brokers.get(1), brokers.get(2), brokers.get(3)))
458+
), Map.of());
459+
460+
adminClient.addTopic(false, "bar", List.of(
461+
new TopicPartitionInfo(0, brokers.get(4),
462+
List.of(brokers.get(4), brokers.get(5)),
463+
List.of(brokers.get(4), brokers.get(5)))
464+
), Map.of());
465+
466+
Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
467+
proposedParts.put(new TopicPartition("foo", 1), List.of(0, 1, 2));
468+
proposedParts.put(new TopicPartition("bar", 0), List.of(3, 4, 5));
469+
470+
Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
471+
currentParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
472+
currentParts.put(new TopicPartition("bar", 0), List.of(4, 5));
450473

451-
assertEquals(String.join(System.lineSeparator(),
452-
"Current partition replica assignment",
453-
"",
454-
"{\"version\":1,\"partitions\":" +
455-
"[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[7,8],\"log_dirs\":[\"any\",\"any\"]}," +
456-
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[4,5,6],\"log_dirs\":[\"any\",\"any\",\"any\"]}]" +
457-
"}",
458-
"",
459-
"Save this to use as the --reassignment-json-file option during rollback"),
460-
currentPartitionReplicaAssignmentToString(proposedParts, currentParts)
461-
);
474+
assertEquals(String.join(System.lineSeparator(),
475+
"Current partition replica assignment",
476+
"",
477+
"{\"version\":1,\"partitions\":[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[4,5],\"log_dirs\":[\"/tmp/broker4/logs\",\"/tmp/broker4/logs\"]}," +
478+
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}",
479+
"",
480+
"Save this to use as the --reassignment-json-file option during rollback"),
481+
currentPartitionReplicaAssignmentToString(adminClient, proposedParts, currentParts)
482+
);
483+
}
462484
}
463485

464486
@Test
@@ -765,4 +787,38 @@ public void testPropagateInvalidJsonError() {
765787
assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM, false)).getMessage());
766788
}
767789
}
790+
791+
@Test
792+
public void testGetReplicaToLogDir() throws Exception {
793+
try (MockAdminClient adminClient = new MockAdminClient.Builder()
794+
.numBrokers(4)
795+
.brokerLogDirs(List.of(
796+
List.of("/tmp/broker0/logs0"),
797+
List.of("/tmp/broker1/logs0"),
798+
List.of("/tmp/broker2/logs0"),
799+
List.of("/tmp/broker3/logs0")
800+
)).build()
801+
) {
802+
addTopics(adminClient);
803+
804+
Map<TopicPartition, List<Integer>> topicPartitionToReplicas = Map.of(
805+
new TopicPartition("foo", 0), List.of(0, 1, 2),
806+
new TopicPartition("foo", 1), List.of(1, 2, 3),
807+
new TopicPartition("bar", 0), List.of(2, 3, 0)
808+
);
809+
810+
Map<TopicPartitionReplica, String> result = getReplicaToLogDir(adminClient, topicPartitionToReplicas);
811+
812+
assertFalse(result.isEmpty());
813+
assertEquals("/tmp/broker0/logs0", result.get(new TopicPartitionReplica("foo", 0, 0)));
814+
assertEquals("/tmp/broker0/logs0", result.get(new TopicPartitionReplica("foo", 0, 1)));
815+
assertEquals("/tmp/broker0/logs0", result.get(new TopicPartitionReplica("foo", 0, 2)));
816+
assertEquals("/tmp/broker1/logs0", result.get(new TopicPartitionReplica("foo", 1, 1)));
817+
assertEquals("/tmp/broker1/logs0", result.get(new TopicPartitionReplica("foo", 1, 2)));
818+
assertEquals("/tmp/broker1/logs0", result.get(new TopicPartitionReplica("foo", 1, 3)));
819+
assertEquals("/tmp/broker2/logs0", result.get(new TopicPartitionReplica("bar", 0, 0)));
820+
assertEquals("/tmp/broker2/logs0", result.get(new TopicPartitionReplica("bar", 0, 2)));
821+
assertEquals("/tmp/broker2/logs0", result.get(new TopicPartitionReplica("bar", 0, 3)));
822+
}
823+
}
768824
}

0 commit comments

Comments
 (0)