Skip to content

Commit 183317c

Browse files
iliaxiliaxHaarolean
committed
BE: Fix loading freezes in case one of the brokers is down (#3618)
Co-authored-by: iliax <ikuramshin@provectus.com> Co-authored-by: Roman Zabaluev <rzabaluev@provectus.com> (cherry picked from commit dbdced5)
1 parent fcd8824 commit 183317c

File tree

3 files changed

+93
-24
lines changed

3 files changed

+93
-24
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import static java.util.stream.Collectors.toMap;
55
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
66

7+
import com.google.common.annotations.VisibleForTesting;
78
import com.google.common.collect.ImmutableTable;
89
import com.google.common.collect.Iterables;
910
import com.google.common.collect.Table;
@@ -498,6 +499,14 @@ public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicPartition> pa
498499
.flatMap(parts -> listOffsetsUnsafe(parts, offsetSpec));
499500
}
500501

502+
/**
503+
* List offset for the specified topics, skipping no-leader partitions.
504+
*/
505+
public Mono<Map<TopicPartition, Long>> listOffsets(Collection<TopicDescription> topicDescriptions,
506+
OffsetSpec offsetSpec) {
507+
return listOffsetsUnsafe(filterPartitionsWithLeaderCheck(topicDescriptions, p -> true, false), offsetSpec);
508+
}
509+
501510
private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collection<TopicPartition> partitions,
502511
boolean failOnUnknownLeader) {
503512
var targetTopics = partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
@@ -507,34 +516,44 @@ private Mono<Collection<TopicPartition>> filterPartitionsWithLeaderCheck(Collect
507516
descriptions.values(), partitions::contains, failOnUnknownLeader));
508517
}
509518

510-
private Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
519+
@VisibleForTesting
520+
static Set<TopicPartition> filterPartitionsWithLeaderCheck(Collection<TopicDescription> topicDescriptions,
511521
Predicate<TopicPartition> partitionPredicate,
512522
boolean failOnUnknownLeader) {
513523
var goodPartitions = new HashSet<TopicPartition>();
514524
for (TopicDescription description : topicDescriptions) {
525+
var goodTopicPartitions = new ArrayList<TopicPartition>();
515526
for (TopicPartitionInfo partitionInfo : description.partitions()) {
516527
TopicPartition topicPartition = new TopicPartition(description.name(), partitionInfo.partition());
517-
if (!partitionPredicate.test(topicPartition)) {
518-
continue;
528+
if (partitionInfo.leader() == null) {
529+
if (failOnUnknownLeader) {
530+
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
531+
} else {
532+
// if ANY of topic partitions has no leader - we have to skip all topic partitions
533+
goodTopicPartitions.clear();
534+
break;
535+
}
519536
}
520-
if (partitionInfo.leader() != null) {
521-
goodPartitions.add(topicPartition);
522-
} else if (failOnUnknownLeader) {
523-
throw new ValidationException(String.format("Topic partition %s has no leader", topicPartition));
537+
if (partitionPredicate.test(topicPartition)) {
538+
goodTopicPartitions.add(topicPartition);
524539
}
525540
}
541+
goodPartitions.addAll(goodTopicPartitions);
526542
}
527543
return goodPartitions;
528544
}
529545

530-
// 1. NOTE(!): should only apply for partitions with existing leader,
546+
// 1. NOTE(!): should only apply for partitions from topics where all partitions have leaders,
531547
// otherwise AdminClient will try to fetch topic metadata, fail and retry infinitely (until timeout)
532548
// 2. NOTE(!): Skips partitions that were not initialized yet
533549
// (UnknownTopicOrPartitionException thrown, ex. after topic creation)
534550
// 3. TODO: check if it is a bug that AdminClient never throws LeaderNotAvailableException and just retrying instead
535551
@KafkaClientInternalsDependant
536-
public Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions,
537-
OffsetSpec offsetSpec) {
552+
@VisibleForTesting
553+
Mono<Map<TopicPartition, Long>> listOffsetsUnsafe(Collection<TopicPartition> partitions, OffsetSpec offsetSpec) {
554+
if (partitions.isEmpty()) {
555+
return Mono.just(Map.of());
556+
}
538557

539558
Function<Collection<TopicPartition>, Mono<Map<TopicPartition, Long>>> call =
540559
parts -> {

kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static java.util.stream.Collectors.toList;
44
import static java.util.stream.Collectors.toMap;
55

6+
import com.google.common.collect.Sets;
67
import com.provectus.kafka.ui.config.ClustersProperties;
78
import com.provectus.kafka.ui.exception.TopicMetadataException;
89
import com.provectus.kafka.ui.exception.TopicNotFoundException;
@@ -136,22 +137,14 @@ private List<InternalTopic> createList(List<String> orderedNames,
136137
}
137138

138139
private Mono<InternalPartitionsOffsets> getPartitionOffsets(Map<String, TopicDescription>
139-
descriptions,
140+
descriptionsMap,
140141
ReactiveAdminClient ac) {
141-
var topicPartitions = descriptions.values().stream()
142-
.flatMap(desc ->
143-
desc.partitions().stream()
144-
// list offsets should only be applied to partitions with existing leader
145-
// (see ReactiveAdminClient.listOffsetsUnsafe(..) docs)
146-
.filter(tp -> tp.leader() != null)
147-
.map(p -> new TopicPartition(desc.name(), p.partition())))
148-
.collect(toList());
149-
150-
return ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.earliest())
151-
.zipWith(ac.listOffsetsUnsafe(topicPartitions, OffsetSpec.latest()),
142+
var descriptions = descriptionsMap.values();
143+
return ac.listOffsets(descriptions, OffsetSpec.earliest())
144+
.zipWith(ac.listOffsets(descriptions, OffsetSpec.latest()),
152145
(earliest, latest) ->
153-
topicPartitions.stream()
154-
.filter(tp -> earliest.containsKey(tp) && latest.containsKey(tp))
146+
Sets.intersection(earliest.keySet(), latest.keySet())
147+
.stream()
155148
.map(tp ->
156149
Map.entry(tp,
157150
new InternalPartitionsOffsets.Offsets(

kafka-ui-api/src/test/java/com/provectus/kafka/ui/service/ReactiveAdminClientTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44
import static java.util.Objects.requireNonNull;
55
import static org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
66
import static org.assertj.core.api.Assertions.assertThat;
7+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
8+
import static org.assertj.core.api.ThrowableAssert.ThrowingCallable;
79

810
import com.provectus.kafka.ui.AbstractIntegrationTest;
11+
import com.provectus.kafka.ui.exception.ValidationException;
912
import com.provectus.kafka.ui.producer.KafkaTestProducer;
1013
import java.time.Duration;
1114
import java.util.ArrayList;
@@ -22,16 +25,20 @@
2225
import org.apache.kafka.clients.admin.ConfigEntry;
2326
import org.apache.kafka.clients.admin.NewTopic;
2427
import org.apache.kafka.clients.admin.OffsetSpec;
28+
import org.apache.kafka.clients.admin.TopicDescription;
2529
import org.apache.kafka.clients.consumer.ConsumerConfig;
2630
import org.apache.kafka.clients.consumer.KafkaConsumer;
2731
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
2832
import org.apache.kafka.clients.producer.ProducerRecord;
2933
import org.apache.kafka.common.KafkaFuture;
34+
import org.apache.kafka.common.Node;
3035
import org.apache.kafka.common.TopicPartition;
36+
import org.apache.kafka.common.TopicPartitionInfo;
3137
import org.apache.kafka.common.config.ConfigResource;
3238
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
3339
import org.apache.kafka.common.internals.KafkaFutureImpl;
3440
import org.apache.kafka.common.serialization.StringDeserializer;
41+
import org.assertj.core.api.ThrowableAssert;
3542
import org.junit.function.ThrowingRunnable;
3643
import org.junit.jupiter.api.AfterEach;
3744
import org.junit.jupiter.api.BeforeEach;
@@ -133,6 +140,56 @@ void testToMonoWithExceptionFilter() {
133140
.verifyComplete();
134141
}
135142

143+
@Test
144+
void filterPartitionsWithLeaderCheckSkipsPartitionsFromTopicWhereSomePartitionsHaveNoLeader() {
145+
var filteredPartitions = ReactiveAdminClient.filterPartitionsWithLeaderCheck(
146+
List.of(
147+
// contains partitions with no leader
148+
new TopicDescription("noLeaderTopic", false,
149+
List.of(
150+
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
151+
new TopicPartitionInfo(1, null, List.of(), List.of()))),
152+
// should be skipped by predicate
153+
new TopicDescription("skippingByPredicate", false,
154+
List.of(
155+
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))),
156+
// good topic
157+
new TopicDescription("good", false,
158+
List.of(
159+
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
160+
new TopicPartitionInfo(1, new Node(2, "n2", 9092), List.of(), List.of()))
161+
)),
162+
p -> !p.topic().equals("skippingByPredicate"),
163+
false
164+
);
165+
166+
assertThat(filteredPartitions)
167+
.containsExactlyInAnyOrder(
168+
new TopicPartition("good", 0),
169+
new TopicPartition("good", 1)
170+
);
171+
}
172+
173+
@Test
174+
void filterPartitionsWithLeaderCheckThrowExceptionIfThereIsSomePartitionsWithoutLeaderAndFlagSet() {
175+
ThrowingCallable call = () -> ReactiveAdminClient.filterPartitionsWithLeaderCheck(
176+
List.of(
177+
// contains partitions with no leader
178+
new TopicDescription("t1", false,
179+
List.of(
180+
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()),
181+
new TopicPartitionInfo(1, null, List.of(), List.of()))),
182+
new TopicDescription("t2", false,
183+
List.of(
184+
new TopicPartitionInfo(0, new Node(1, "n1", 9092), List.of(), List.of()))
185+
)),
186+
p -> true,
187+
// setting failOnNoLeader flag
188+
true
189+
);
190+
assertThatThrownBy(call).isInstanceOf(ValidationException.class);
191+
}
192+
136193
@Test
137194
void testListOffsetsUnsafe() {
138195
String topic = UUID.randomUUID().toString();

0 commit comments

Comments
 (0)