Skip to content

Commit dd784e7

Browse files
authored
KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets (#19820)
[KAFKA-16717](https://issues.apache.org/jira/browse/KAFKA-16717) aims to finish the AlterShareGroupOffsets for ShareGroupCommand part. Reviewers: Lan Ding <isDing_L@163.com>, Chia-Ping Tsai <chia7712@gmail.com>, TaiJuWu <tjwu1217@gmail.com>, Andrew Schofield <aschofield@confluent.io>
1 parent 40b4fdb commit dd784e7

File tree

11 files changed

+660
-101
lines changed

11 files changed

+660
-101
lines changed

clients/src/main/java/org/apache/kafka/common/requests/AlterShareGroupOffsetsResponse.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ private AlterShareGroupOffsetsResponseTopic getOrCreateTopic(String topic, Uuid
8383
return topicData;
8484
}
8585

86-
public Builder addPartition(String topic, int partition, Map<String, Uuid> topicIdsToNames, Errors error) {
86+
public Builder addPartition(String topic, int partition, Map<String, Uuid> topicIdsToNames, ApiError error) {
8787
AlterShareGroupOffsetsResponseTopic topicData = getOrCreateTopic(topic, topicIdsToNames.get(topic));
8888
topicData.partitions().add(new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
8989
.setPartitionIndex(partition)
90-
.setErrorCode(error.code())
90+
.setErrorCode(error.error().code())
9191
.setErrorMessage(error.message()));
9292
return this;
9393
}

clients/src/test/java/org/apache/kafka/clients/admin/AdminClientTestUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.Collection;
3232
import java.util.Collections;
3333
import java.util.Map;
34+
import java.util.Optional;
3435
import java.util.Set;
3536
import java.util.stream.Collectors;
3637

@@ -194,6 +195,22 @@ public static ListShareGroupOffsetsResult createListShareGroupOffsetsResult(Map<
194195
return new ListShareGroupOffsetsResult(coordinatorFutures);
195196
}
196197

198+
public static ListOffsetsResult createListOffsetsResult(Map<TopicPartition, OffsetAndMetadata> partitionOffsets) {
199+
Map<TopicPartition, KafkaFuture<ListOffsetsResult.ListOffsetsResultInfo>> futures =
200+
partitionOffsets.entrySet().stream()
201+
.collect(Collectors.toMap(
202+
Map.Entry::getKey,
203+
entry -> KafkaFuture.completedFuture(
204+
new ListOffsetsResult.ListOffsetsResultInfo(
205+
entry.getValue().offset(),
206+
System.currentTimeMillis(),
207+
Optional.of(1)
208+
)
209+
)
210+
));
211+
return new ListOffsetsResult(futures);
212+
}
213+
197214
/**
198215
* Helper to create a KafkaAdminClient with a custom HostResolver accessible to tests outside this package.
199216
*/

core/src/main/scala/kafka/server/KafkaApis.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3852,7 +3852,7 @@ class KafkaApis(val requestChannel: RequestChannel,
38523852
}
38533853
topicError match {
38543854
case Some(error) =>
3855-
topic.partitions.forEach(partition => responseBuilder.addPartition(topic.topicName, partition.partitionIndex, metadataCache.topicNamesToIds, error.error))
3855+
topic.partitions.forEach(partition => responseBuilder.addPartition(topic.topicName, partition.partitionIndex, metadataCache.topicNamesToIds, error))
38563856
case None =>
38573857
authorizedTopicPartitions.add(topic.duplicate)
38583858
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -709,15 +709,50 @@ CompletableFuture<AlterShareGroupOffsetsResponseData> persisterInitialize(
709709
handlePersisterInitializeResponse(request.groupTopicPartitionData().groupId(), result, new ShareGroupHeartbeatResponseData());
710710
return response;
711711
} else {
712-
//TODO build new AlterShareGroupOffsetsResponseData for error response
713-
return response;
712+
return buildErrorResponse(response, result);
714713
}
715714
} else {
716715
return buildErrorResponse(request, response, exp);
717716
}
718717

719718
});
720719
}
720+
721+
private AlterShareGroupOffsetsResponseData buildErrorResponse(AlterShareGroupOffsetsResponseData response, InitializeShareGroupStateResult result) {
722+
AlterShareGroupOffsetsResponseData data = new AlterShareGroupOffsetsResponseData();
723+
Map<Uuid, Map<Integer, PartitionErrorData>> topicPartitionErrorsMap = result.getErrors();
724+
data.setResponses(
725+
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream()
726+
.map(topic -> {
727+
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
728+
.setTopicName(topic.topicName())
729+
.setTopicId(topic.topicId());
730+
topic.partitions().forEach(partition -> {
731+
if (partition.errorCode() != Errors.NONE.code()) {
732+
topicData.partitions().add(partition);
733+
return;
734+
}
735+
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData;
736+
Map<Integer, PartitionErrorData> partitionErrors =
737+
Optional.ofNullable(topicPartitionErrorsMap)
738+
.map(map -> map.get(topic.topicId()))
739+
.orElse(Collections.emptyMap());
740+
PartitionErrorData error = partitionErrors.get(partition.partitionIndex());
741+
if (error == null) {
742+
partitionData = partition.duplicate();
743+
} else {
744+
partitionData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
745+
.setPartitionIndex(partition.partitionIndex())
746+
.setErrorCode(error.errorCode())
747+
.setErrorMessage(error.errorMessage());
748+
}
749+
topicData.partitions().add(partitionData);
750+
});
751+
return topicData;
752+
})
753+
.iterator()));
754+
return data;
755+
}
721756

722757
private AlterShareGroupOffsetsResponseData buildErrorResponse(InitializeShareGroupStateParameters request, AlterShareGroupOffsetsResponseData response, Throwable exp) {
723758
// build new AlterShareGroupOffsetsResponseData for error response
@@ -726,13 +761,14 @@ private AlterShareGroupOffsetsResponseData buildErrorResponse(InitializeShareGro
726761
log.error("Unable to initialize share group state for {}, {} while altering share group offsets", gtp.groupId(), gtp.topicsData(), exp);
727762
Errors error = Errors.forException(exp);
728763
data.setErrorCode(error.code())
729-
.setErrorMessage(error.message())
764+
.setErrorMessage(exp.getMessage())
730765
.setResponses(response.responses());
731766
data.setResponses(
732767
new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection(response.responses().stream()
733768
.map(topic -> {
734769
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic topicData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopic()
735-
.setTopicName(topic.topicName());
770+
.setTopicName(topic.topicName())
771+
.setTopicId(topic.topicId());
736772
topic.partitions().forEach(partition -> {
737773
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData = new AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition()
738774
.setPartitionIndex(partition.partitionIndex())

server-common/src/main/java/org/apache/kafka/server/share/persister/InitializeShareGroupStateResult.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.kafka.server.share.persister;
1919

20+
import org.apache.kafka.common.Uuid;
2021
import org.apache.kafka.common.message.InitializeShareGroupStateResponseData;
2122
import org.apache.kafka.common.protocol.Errors;
2223

@@ -59,6 +60,18 @@ public Map<Errors, Integer> errorCounts() {
5960
));
6061
}
6162

63+
public Map<Uuid, Map<Integer, PartitionErrorData>> getErrors() {
64+
return topicsData.stream()
65+
.collect(Collectors.toMap(
66+
TopicData::topicId,
67+
topicData -> topicData.partitions().stream()
68+
.collect(Collectors.toMap(
69+
PartitionIdData::partition,
70+
partitionErrorData -> partitionErrorData
71+
))
72+
));
73+
}
74+
6275
public static class Builder {
6376
private List<TopicData<PartitionErrorData>> topicsData;
6477

server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,11 @@ public static void checkInvalidArgsSet(OptionParser parser,
130130
}
131131
}
132132

133+
public static void printErrorAndExit(String message) {
134+
System.err.println(message);
135+
Exit.exit(1, message);
136+
}
137+
133138
public static void printUsageAndExit(OptionParser parser, String message) {
134139
System.err.println(message);
135140
try {

tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.kafka.clients.admin.TopicDescription;
2626
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2727
import org.apache.kafka.common.TopicPartition;
28+
import org.apache.kafka.common.errors.LeaderNotAvailableException;
29+
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
2830
import org.apache.kafka.common.requests.ListOffsetsResponse;
2931
import org.apache.kafka.common.utils.Utils;
3032
import org.apache.kafka.server.util.CommandLineUtils;
@@ -46,6 +48,7 @@
4648
import java.util.List;
4749
import java.util.Map;
4850
import java.util.Optional;
51+
import java.util.Set;
4952
import java.util.concurrent.ExecutionException;
5053
import java.util.function.Function;
5154
import java.util.function.ToIntFunction;
@@ -68,6 +71,28 @@ public OffsetsUtils(Admin adminClient, OptionParser parser, OffsetsUtilsOptions
6871
this.parser = parser;
6972
}
7073

74+
public static void printOffsetsToReset(Map<String, Map<TopicPartition, OffsetAndMetadata>> groupAssignmentsToReset) {
75+
int maxGroupLen = Math.max(15, groupAssignmentsToReset.keySet().stream().mapToInt(String::length).max().orElse(0));
76+
int maxTopicLen = Math.max(15, groupAssignmentsToReset.values().stream()
77+
.flatMap(assignments -> assignments.keySet().stream())
78+
.mapToInt(tp -> tp.topic().length())
79+
.max()
80+
.orElse(0));
81+
82+
String format = "%n%" + (-maxGroupLen) + "s %" + (-maxTopicLen) + "s %-10s %s";
83+
if (!groupAssignmentsToReset.isEmpty())
84+
System.out.printf(format, "GROUP", "TOPIC", "PARTITION", "NEW-OFFSET");
85+
86+
groupAssignmentsToReset.forEach((groupId, assignment) ->
87+
assignment.forEach((consumerAssignment, offsetAndMetadata) ->
88+
System.out.printf(format,
89+
groupId,
90+
consumerAssignment.topic(),
91+
consumerAssignment.partition(),
92+
offsetAndMetadata.offset())));
93+
System.out.println();
94+
}
95+
7196
public Optional<Map<String, Map<TopicPartition, OffsetAndMetadata>>> resetPlanFromFile() {
7297
if (opts.resetFromFileOpt != null && !opts.resetFromFileOpt.isEmpty()) {
7398
try {
@@ -414,6 +439,55 @@ public Map<TopicPartition, OffsetAndMetadata> resetToCurrent(Collection<TopicPar
414439
return preparedOffsetsForPartitionsWithCommittedOffset;
415440
}
416441

442+
public void checkAllTopicPartitionsValid(Collection<TopicPartition> partitionsToReset) {
443+
// check the partitions exist
444+
List<TopicPartition> partitionsNotExistList = filterNonExistentPartitions(partitionsToReset);
445+
if (!partitionsNotExistList.isEmpty()) {
446+
String partitionStr = partitionsNotExistList.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
447+
throw new UnknownTopicOrPartitionException("The partitions \"" + partitionStr + "\" do not exist");
448+
}
449+
450+
// check the partitions have leader
451+
List<TopicPartition> partitionsWithoutLeader = filterNoneLeaderPartitions(partitionsToReset);
452+
if (!partitionsWithoutLeader.isEmpty()) {
453+
String partitionStr = partitionsWithoutLeader.stream().map(TopicPartition::toString).collect(Collectors.joining(","));
454+
throw new LeaderNotAvailableException("The partitions \"" + partitionStr + "\" have no leader");
455+
}
456+
}
457+
458+
public List<TopicPartition> filterNoneLeaderPartitions(Collection<TopicPartition> topicPartitions) {
459+
// collect all topics
460+
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
461+
462+
try {
463+
return adminClient.describeTopics(topics).allTopicNames().get().entrySet()
464+
.stream()
465+
.flatMap(entry -> entry.getValue().partitions().stream()
466+
.filter(partitionInfo -> partitionInfo.leader() == null)
467+
.map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition())))
468+
.filter(topicPartitions::contains)
469+
.toList();
470+
} catch (Exception e) {
471+
throw new RuntimeException(e);
472+
}
473+
}
474+
475+
public List<TopicPartition> filterNonExistentPartitions(Collection<TopicPartition> topicPartitions) {
476+
// collect all topics
477+
Set<String> topics = topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
478+
try {
479+
List<TopicPartition> existPartitions = adminClient.describeTopics(topics).allTopicNames().get().entrySet()
480+
.stream()
481+
.flatMap(entry -> entry.getValue().partitions().stream()
482+
.map(partitionInfo -> new TopicPartition(entry.getKey(), partitionInfo.partition())))
483+
.toList();
484+
485+
return topicPartitions.stream().filter(tp -> !existPartitions.contains(tp)).toList();
486+
} catch (InterruptedException | ExecutionException e) {
487+
throw new RuntimeException(e);
488+
}
489+
}
490+
417491
private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
418492
int t = (int) opts.timeoutMsOpt;
419493
return options.timeoutMs(t);
@@ -469,5 +543,15 @@ public OffsetsUtilsOptions(
469543
this.resetShiftByOpt = resetShiftByOpt;
470544
this.timeoutMsOpt = timeoutMsOpt;
471545
}
546+
547+
public OffsetsUtilsOptions(
548+
List<String> groupOpt,
549+
List<String> resetToDatetimeOpt,
550+
long timeoutMsOpt) {
551+
552+
this.groupOpt = groupOpt;
553+
this.resetToDatetimeOpt = resetToDatetimeOpt;
554+
this.timeoutMsOpt = timeoutMsOpt;
555+
}
472556
}
473557
}

0 commit comments

Comments
 (0)