Skip to content

Commit 8057dc1

Browse files
authored
Fixed bugs (#90)
* Fixed bugs * More fixes
1 parent bc2efc9 commit 8057dc1

File tree

19 files changed

+201
-129
lines changed

19 files changed

+201
-129
lines changed

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/deserialization/SimpleRecordDeserializer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ public class SimpleRecordDeserializer implements RecordDeserializer {
1010

1111
@Override
1212
public Object deserialize(ConsumerRecord<Bytes, Bytes> record) {
13-
return stringDeserializer.deserialize(record.topic(), record.value().get());
13+
if (record.value()!=null) {
14+
return stringDeserializer.deserialize(record.topic(), record.value().get());
15+
} else {
16+
return "empty";
17+
}
1418
}
1519
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/mapper/ClusterMapper.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package com.provectus.kafka.ui.cluster.mapper;
22

33
import com.provectus.kafka.ui.cluster.config.ClustersProperties;
4-
import com.provectus.kafka.ui.cluster.model.InternalClusterMetrics;
5-
import com.provectus.kafka.ui.cluster.model.InternalTopic;
6-
import com.provectus.kafka.ui.cluster.model.InternalTopicConfig;
7-
import com.provectus.kafka.ui.cluster.model.KafkaCluster;
4+
import com.provectus.kafka.ui.cluster.model.*;
85
import com.provectus.kafka.ui.model.*;
96
import org.mapstruct.Mapper;
107
import org.mapstruct.Mapping;
@@ -19,8 +16,10 @@ public interface ClusterMapper {
1916
Cluster toCluster(KafkaCluster cluster);
2017

2118
KafkaCluster toKafkaCluster(ClustersProperties.Cluster clusterProperties);
22-
BrokersMetrics toBrokerMetrics(InternalClusterMetrics metrics);
19+
ClusterMetrics toClusterMetrics(InternalClusterMetrics metrics);
20+
BrokerMetrics toBrokerMetrics(InternalBrokerMetrics metrics);
2321
Topic toTopic(InternalTopic topic);
2422
TopicDetails toTopicDetails(InternalTopic topic);
2523
TopicConfig toTopicConfig(InternalTopicConfig topic);
24+
Replica toReplica(InternalReplica replica);
2625
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalBrokerMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@
1010
@Builder(toBuilder = true)
1111
public class InternalBrokerMetrics {
1212
private final Long segmentSize;
13-
private final List<Metric> jmxMetrics;
13+
private final List<Metric> metrics;
1414
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalPartition.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@
66
import java.util.List;
77

88
@Data
9-
@Builder
9+
@Builder(toBuilder = true)
1010
public class InternalPartition {
1111
private final int partition;
1212
private final Integer leader;
1313
private final List<InternalReplica> replicas;
1414
private final int inSyncReplicasCount;
1515
private final int replicasCount;
16+
private final long offsetMin;
17+
private final long offsetMax;
1618
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/InternalTopic.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package com.provectus.kafka.ui.cluster.model;
22

3-
import com.provectus.kafka.ui.model.TopicPartitionDto;
43
import lombok.Builder;
54
import lombok.Data;
65
import org.apache.kafka.common.TopicPartition;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/model/KafkaCluster.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
public class KafkaCluster {
1212

1313
private final String name;
14-
private final int jmxPort;
14+
private final Integer jmxPort;
1515
private final String bootstrapServers;
1616
private final String zookeeper;
1717
private final String schemaRegistry;

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/service/ClusterService.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,22 @@ public List<Cluster> getClusters() {
3838
.collect(Collectors.toList());
3939
}
4040

41-
public Mono<BrokersMetrics> getBrokersMetrics(String name, Integer id) {
41+
public Mono<BrokerMetrics> getBrokerMetrics(String name, Integer id) {
4242
return Mono.justOrEmpty(clustersStorage.getClusterByName(name)
43-
.map(KafkaCluster::getMetrics)
44-
.map(s -> {
45-
var brokerMetrics = clusterMapper.toBrokerMetrics(s);
46-
brokerMetrics.setMetrics(s.getInternalBrokerMetrics().get(id).getJmxMetrics());
47-
brokerMetrics.setSegmentZise(Long.valueOf(s.getSegmentSize()).intValue());
48-
return brokerMetrics;
49-
}));
43+
.map( c -> c.getMetrics().getInternalBrokerMetrics())
44+
.map( m -> m.get(id))
45+
.map(clusterMapper::toBrokerMetrics));
5046
}
5147

48+
public Mono<ClusterMetrics> getClusterMetrics(String name) {
49+
return Mono.justOrEmpty(
50+
clustersStorage.getClusterByName(name)
51+
.map(KafkaCluster::getMetrics)
52+
.map(clusterMapper::toClusterMetrics)
53+
);
54+
}
55+
56+
5257
public List<Topic> getTopics(String name) {
5358
return clustersStorage.getClusterByName(name)
5459
.map(c ->
@@ -60,12 +65,15 @@ public List<Topic> getTopics(String name) {
6065

6166
public Optional<TopicDetails> getTopicDetails(String name, String topicName) {
6267
return clustersStorage.getClusterByName(name)
63-
.map(c -> {
64-
var topic = c.getTopics().get(topicName);
65-
return clusterMapper
66-
.toTopicDetails(topic)
67-
.partitions(kafkaService.partitionDtoList(topic, c));
68-
});
68+
.flatMap( c ->
69+
Optional.ofNullable(
70+
c.getTopics().get(topicName)
71+
).map(
72+
t -> t.toBuilder().partitions(
73+
kafkaService.getTopicPartitions(c, t)
74+
).build()
75+
).map(clusterMapper::toTopicDetails)
76+
);
6977
}
7078

7179
public Optional<List<TopicConfig>> getTopicConfigs(String name, String topicName) {
@@ -143,6 +151,7 @@ public Mono<Topic> updateTopic(String clusterName, String topicName, Mono<TopicF
143151
return clustersStorage.getClusterByName(clusterName).map(cl ->
144152
topicFormData
145153
.flatMap(t -> kafkaService.updateTopic(cl, topicName, t))
154+
.map(clusterMapper::toTopic)
146155
.flatMap(t -> updateCluster(t, clusterName, cl))
147156
)
148157
.orElse(Mono.empty());
@@ -161,4 +170,5 @@ public Flux<TopicMessage> getMessages(String clusterName, String topicName, Cons
161170
.map(c -> consumingService.loadMessages(c, topicName, consumerPosition, query, limit))
162171
.orElse(Flux.empty());
163172
}
173+
164174
}

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/ClusterUtil.java

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,12 @@ public static List<ConsumerTopicPartitionDetail> convertToConsumerTopicPartition
6868
) {
6969
return consumer.assignment().topicPartitions().stream()
7070
.map(tp -> {
71-
Long currentOffset = groupOffsets.get(tp).offset();
72-
Long endOffset = endOffsets.get(tp);
71+
Long currentOffset = Optional.ofNullable(
72+
groupOffsets.get(tp)).map(o -> o.offset()).orElse(0L);
73+
Long endOffset = Optional.ofNullable(endOffsets.get(tp)).orElse(0L);
7374
ConsumerTopicPartitionDetail cd = new ConsumerTopicPartitionDetail();
7475
cd.setConsumerId(consumer.consumerId());
76+
cd.setHost(consumer.host());
7577
cd.setTopic(tp.topic());
7678
cd.setPartition(tp.partition());
7779
cd.setCurrentOffset(currentOffset);
@@ -116,7 +118,7 @@ public static InternalTopic mapToInternalTopic(TopicDescription topicDescription
116118

117119
int urpCount = partitions.stream()
118120
.flatMap(partition -> partition.getReplicas().stream())
119-
.filter(InternalReplica::isInSync).mapToInt(e -> 1)
121+
.filter(p -> !p.isInSync()).mapToInt(e -> 1)
120122
.sum();
121123

122124
int inSyncReplicasCount = partitions.stream()
@@ -199,6 +201,10 @@ private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(Ma
199201
.filter(entry -> entry.name().contains(CLUSTER_VERSION_PARAM_KEY))
200202
.findFirst().orElseThrow().value();
201203
try {
204+
final String[] parts = version.split("\\.");
205+
if (parts.length>2) {
206+
version = parts[0] + "." + parts[1];
207+
}
202208
return Float.parseFloat(version.split("-")[0]) <= 2.3f
203209
? ExtendedAdminClient.SupportedFeature.ALTER_CONFIGS : ExtendedAdminClient.SupportedFeature.INCREMENTAL_ALTER_CONFIGS;
204210
} catch (Exception e) {
@@ -207,24 +213,6 @@ private static ExtendedAdminClient.SupportedFeature getSupportedUpdateFeature(Ma
207213
}
208214
}
209215

210-
public static Topic convertToTopic(InternalTopic internalTopic) {
211-
Topic topic = new Topic();
212-
topic.setName(internalTopic.getName());
213-
List<Partition> partitions = internalTopic.getPartitions().stream().flatMap(s -> {
214-
Partition partition = new Partition();
215-
partition.setPartition(s.getPartition());
216-
partition.setLeader(s.getLeader());
217-
partition.setReplicas(s.getReplicas().stream().flatMap(r -> {
218-
Replica replica = new Replica();
219-
replica.setBroker(r.getBroker());
220-
return Stream.of(replica);
221-
}).collect(Collectors.toList()));
222-
return Stream.of(partition);
223-
}).collect(Collectors.toList());
224-
topic.setPartitions(partitions);
225-
return topic;
226-
}
227-
228216
public static <T, R> Map<T, R> toSingleMap (Stream<Map<T, R>> streamOfMaps) {
229217
return streamOfMaps.reduce((map1, map2) -> Stream.concat(map1.entrySet().stream(), map2.entrySet().stream())
230218
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).orElseThrow();

kafka-ui-api/src/main/java/com/provectus/kafka/ui/cluster/util/JmxClusterUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private void closeConnectionExceptionally(String url, JMXConnector srv) {
100100
public List<MetricDto> convertToMetricDto(InternalClusterMetrics internalClusterMetrics) {
101101
return internalClusterMetrics.getInternalBrokerMetrics().values().stream()
102102
.map(c ->
103-
c.getJmxMetrics().stream()
103+
c.getMetrics().stream()
104104
.filter(j -> isSameMetric(j.getCanonicalName()))
105105
.map(j -> j.getValue().entrySet().stream()
106106
.map(e -> new MetricDto(j.getCanonicalName(), e.getKey(), e.getValue()))))

kafka-ui-api/src/main/java/com/provectus/kafka/ui/kafka/KafkaService.java

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ private Mono<String> createTopic(AdminClient adminClient, NewTopic newTopic) {
265265
}
266266

267267
@SneakyThrows
268-
public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
268+
public Mono<InternalTopic> updateTopic(KafkaCluster cluster, String topicName, TopicFormData topicFormData) {
269269
ConfigResource topicCR = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
270270
return getOrCreateAdminClient(cluster)
271271
.flatMap(ac -> {
@@ -281,11 +281,10 @@ public Mono<Topic> updateTopic(KafkaCluster cluster, String topicName, TopicForm
281281

282282

283283

284-
private Mono<Topic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
284+
private Mono<InternalTopic> getUpdatedTopic (ExtendedAdminClient ac, String topicName) {
285285
return getTopicsData(ac.getAdminClient())
286286
.map(s -> s.stream()
287-
.filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow())
288-
.map(ClusterUtil::convertToTopic);
287+
.filter(t -> t.getName().equals(topicName)).findFirst().orElseThrow());
289288
}
290289

291290
private Mono<String> incrementalAlterConfig(TopicFormData topicFormData, ConfigResource topicCR, ExtendedAdminClient ac) {
@@ -346,6 +345,8 @@ private Mono<InternalSegmentSizeDto> updateSegmentMetrics(AdminClient ac, Intern
346345

347346
public List<Metric> getJmxMetric(String clusterName, Node node) {
348347
return clustersStorage.getClusterByName(clusterName)
348+
.filter( c -> c.getJmxPort() != null)
349+
.filter( c -> c.getJmxPort() > 0)
349350
.map(c -> jmxClusterUtil.getJmxMetrics(c.getJmxPort(), node.host())).orElse(Collections.emptyList());
350351
}
351352

@@ -357,7 +358,7 @@ private Mono<InternalClusterMetrics> fillBrokerMetrics(InternalClusterMetrics in
357358
return ClusterUtil.toMono(ac.describeCluster().nodes())
358359
.flatMapIterable(nodes -> nodes)
359360
.map(broker -> Map.of(broker.id(), InternalBrokerMetrics.builder().
360-
jmxMetrics(getJmxMetric(clusterName, broker)).build()))
361+
metrics(getJmxMetric(clusterName, broker)).build()))
361362
.collectList()
362363
.map(s -> internalClusterMetrics.toBuilder().internalBrokerMetrics(ClusterUtil.toSingleMap(s.stream())).build());
363364
}
@@ -377,22 +378,25 @@ private InternalClusterMetrics calculateClusterMetrics(InternalClusterMetrics in
377378
.collect(Collectors.toList())).build();
378379
}
379380

380-
public List<TopicPartitionDto> partitionDtoList (InternalTopic topic, KafkaCluster cluster) {
381-
var topicPartitions = topic.getPartitions().stream().map(t -> new TopicPartition(topic.getName(), t.getPartition())).collect(Collectors.toList());
382-
return getTopicPartitionOffset(cluster, topicPartitions);
383-
}
381+
public List<InternalPartition> getTopicPartitions(KafkaCluster c, InternalTopic topic ) {
382+
var tps = topic.getPartitions().stream()
383+
.map(t -> new TopicPartition(topic.getName(), t.getPartition()))
384+
.collect(Collectors.toList());
385+
Map<Integer, InternalPartition> partitions =
386+
topic.getPartitions().stream().collect(Collectors.toMap(
387+
InternalPartition::getPartition,
388+
tp -> tp
389+
));
384390

385-
private List<TopicPartitionDto> getTopicPartitionOffset(KafkaCluster c, List<TopicPartition> topicPartitions ) {
386391
try (var consumer = createConsumer(c)) {
387-
final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(topicPartitions);
388-
final Map<TopicPartition, Long> latest = consumer.endOffsets(topicPartitions);
392+
final Map<TopicPartition, Long> earliest = consumer.beginningOffsets(tps);
393+
final Map<TopicPartition, Long> latest = consumer.endOffsets(tps);
389394

390-
return topicPartitions.stream()
391-
.map( tp -> new TopicPartitionDto()
392-
.topic(tp.topic())
393-
.partition(tp.partition())
395+
return tps.stream()
396+
.map( tp -> partitions.get(tp.partition()).toBuilder()
394397
.offsetMin(Optional.ofNullable(earliest.get(tp)).orElse(0L))
395398
.offsetMax(Optional.ofNullable(latest.get(tp)).orElse(0L))
399+
.build()
396400
).collect(Collectors.toList());
397401
} catch (Exception e) {
398402
return Collections.emptyList();

kafka-ui-api/src/main/java/com/provectus/kafka/ui/rest/MetricsRestController.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,19 @@ public Mono<ResponseEntity<Flux<Cluster>>> getClusters(ServerWebExchange exchang
3030
}
3131

3232
@Override
33-
public Mono<ResponseEntity<BrokersMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
34-
return clusterService.getBrokersMetrics(clusterName, id)
33+
public Mono<ResponseEntity<BrokerMetrics>> getBrokersMetrics(String clusterName, Integer id, ServerWebExchange exchange) {
34+
return clusterService.getBrokerMetrics(clusterName, id)
3535
.map(ResponseEntity::ok)
3636
.onErrorReturn(ResponseEntity.notFound().build());
3737
}
3838

39+
@Override
40+
public Mono<ResponseEntity<ClusterMetrics>> getClusterMetrics(String clusterName, ServerWebExchange exchange) {
41+
return clusterService.getClusterMetrics(clusterName)
42+
.map(ResponseEntity::ok)
43+
.onErrorReturn(ResponseEntity.notFound().build());
44+
}
45+
3946
@Override
4047
public Mono<ResponseEntity<Flux<Topic>>> getTopics(String clusterName, ServerWebExchange exchange) {
4148
return Mono.just(ResponseEntity.ok(Flux.fromIterable(clusterService.getTopics(clusterName))));

0 commit comments

Comments
 (0)