Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/src/main/java/io/kafbat/ui/serdes/BuiltInSerde.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.kafbat.ui.serdes;

import io.kafbat.ui.serde.api.PropertyResolver;
import io.kafbat.ui.serde.api.SchemaDescription;
import io.kafbat.ui.serde.api.Serde;
import java.util.Optional;

public interface BuiltInSerde extends Serde {

Expand All @@ -24,4 +26,25 @@ default void configure(PropertyResolver serdeProperties,
PropertyResolver kafkaClusterProperties,
PropertyResolver globalProperties) {
}

@Override
default boolean canSerialize(String topic, Serde.Target type) {
return false;
}

@Override
default Serde.Serializer serializer(String topic, Serde.Target type) {
throw new UnsupportedOperationException();
}

@Override
default Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
return Optional.empty();
}

@Override
default Optional<String> getDescription() {
return Optional.empty();
}

}
4 changes: 2 additions & 2 deletions api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public Stream<SerdeInstance> all() {

public SerdeInstance suggestSerdeForSerialize(String topic, Serde.Target type) {
return findSerdeByPatternsOrDefault(topic, type, s -> s.canSerialize(topic, type))
.orElse(serdes.get(StringSerde.name()));
.orElse(serdes.get(StringSerde.NAME));
}

public SerdeInstance suggestSerdeForDeserialize(String topic, Serde.Target type) {
return findSerdeByPatternsOrDefault(topic, type, s -> s.canDeserialize(topic, type))
.orElse(serdes.get(StringSerde.name()));
.orElse(serdes.get(StringSerde.NAME));
}

@Override
Expand Down
62 changes: 43 additions & 19 deletions api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import io.kafbat.ui.serdes.builtin.UInt32Serde;
import io.kafbat.ui.serdes.builtin.UInt64Serde;
import io.kafbat.ui.serdes.builtin.UuidBinarySerde;
import io.kafbat.ui.serdes.builtin.mm2.CheckpointSerde;
import io.kafbat.ui.serdes.builtin.mm2.HeartbeatSerde;
import io.kafbat.ui.serdes.builtin.mm2.OffsetSyncSerde;
import io.kafbat.ui.serdes.builtin.sr.SchemaRegistrySerde;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -39,18 +42,23 @@ public class SerdesInitializer {
public SerdesInitializer() {
this(
ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
.put(StringSerde.name(), StringSerde.class)
.put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
.put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
.put(Int32Serde.name(), Int32Serde.class)
.put(Int64Serde.name(), Int64Serde.class)
.put(UInt32Serde.name(), UInt32Serde.class)
.put(UInt64Serde.name(), UInt64Serde.class)
.put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
.put(Base64Serde.name(), Base64Serde.class)
.put(HexSerde.name(), HexSerde.class)
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
.put(StringSerde.NAME, StringSerde.class)
.put(SchemaRegistrySerde.NAME, SchemaRegistrySerde.class)
.put(ProtobufFileSerde.NAME, ProtobufFileSerde.class)
.put(Int32Serde.NAME, Int32Serde.class)
.put(Int64Serde.NAME, Int64Serde.class)
.put(UInt32Serde.NAME, UInt32Serde.class)
.put(UInt64Serde.NAME, UInt64Serde.class)
.put(AvroEmbeddedSerde.NAME, AvroEmbeddedSerde.class)
.put(Base64Serde.NAME, Base64Serde.class)
.put(HexSerde.NAME, HexSerde.class)
.put(UuidBinarySerde.NAME, UuidBinarySerde.class)
.put(ProtobufRawSerde.NAME, ProtobufRawSerde.class)

// mm2 serdes
.put(HeartbeatSerde.NAME, HeartbeatSerde.class)
.put(OffsetSyncSerde.NAME, OffsetSyncSerde.class)
.put(CheckpointSerde.NAME, CheckpointSerde.class)
.build(),
new CustomSerdeLoader()
);
Expand Down Expand Up @@ -131,8 +139,8 @@ public ClusterSerdes init(Environment env,
.orElse(null),
Optional.ofNullable(clusterProperties.getDefaultValueSerde())
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))
.or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
.or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
.or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.NAME)))
.or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.NAME)))
.orElse(null),
createFallbackSerde()
);
Expand All @@ -142,15 +150,16 @@ public ClusterSerdes init(Environment env,
* Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde.
*/
private void registerTopicRelatedSerde(Map<String, SerdeInstance> serdes) {
registerConsumerOffsetsSerde(serdes);
serdes.putAll(consumerOffsetsSerde());
serdes.putAll(mirrorMakerSerdes());
}

private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
private Map<String, SerdeInstance> consumerOffsetsSerde() {
var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC);
serdes.put(
ConsumerOffsetsSerde.name(),
return Map.of(
ConsumerOffsetsSerde.NAME,
new SerdeInstance(
ConsumerOffsetsSerde.name(),
ConsumerOffsetsSerde.NAME,
new ConsumerOffsetsSerde(),
pattern,
pattern,
Expand All @@ -159,6 +168,21 @@ private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
);
}

private Map<String, SerdeInstance> mirrorMakerSerdes() {
return Map.of(
HeartbeatSerde.NAME,
mirrorSerde(HeartbeatSerde.NAME, HeartbeatSerde.TOPIC_NAME_PATTERN, new HeartbeatSerde()),
OffsetSyncSerde.NAME,
mirrorSerde(OffsetSyncSerde.NAME, OffsetSyncSerde.TOPIC_NAME_PATTERN, new OffsetSyncSerde()),
CheckpointSerde.NAME,
mirrorSerde(CheckpointSerde.NAME, CheckpointSerde.TOPIC_NAME_PATTERN, new CheckpointSerde())
);
}

private SerdeInstance mirrorSerde(String name, Pattern pattern, BuiltInSerde serde) {
return new SerdeInstance(name, serde, pattern, pattern, null);
}

private SerdeInstance createFallbackSerde() {
StringSerde serde = new StringSerde();
serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,13 @@
import org.apache.avro.generic.GenericDatumReader;

public class AvroEmbeddedSerde implements BuiltInSerde {

public static String name() {
return "Avro (Embedded)";
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}
public static final String NAME = "Avro (Embedded)";

@Override
public boolean canDeserialize(String topic, Target type) {
return true;
}

@Override
public boolean canSerialize(String topic, Target type) {
return false;
}

@Override
public Serializer serializer(String topic, Target type) {
throw new IllegalStateException();
Expand Down
17 changes: 1 addition & 16 deletions api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,13 @@
package io.kafbat.ui.serdes.builtin;

import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.SchemaDescription;
import io.kafbat.ui.serde.api.Serde;
import io.kafbat.ui.serdes.BuiltInSerde;
import java.util.Base64;
import java.util.Map;
import java.util.Optional;

public class Base64Serde implements BuiltInSerde {

public static String name() {
return "Base64";
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
return Optional.empty();
}
public static final String NAME = "Base64";

@Override
public boolean canDeserialize(String topic, Serde.Target type) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
package io.kafbat.ui.serdes.builtin;

import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.SchemaDescription;
import io.kafbat.ui.serdes.BuiltInSerde;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import lombok.SneakyThrows;
import org.apache.kafka.common.protocol.types.ArrayOf;
import org.apache.kafka.common.protocol.types.BoundField;
import org.apache.kafka.common.protocol.types.CompactArrayOf;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;

// Deserialization logic and message's schemas can be found in
// kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue)
public class ConsumerOffsetsSerde implements BuiltInSerde {

private static final JsonMapper JSON_MAPPER = createMapper();
public class ConsumerOffsetsSerde extends StructSerde implements BuiltInSerde {
public static final String NAME = "__consumer_offsets";

private static final String ASSIGNMENT = "assignment";
private static final String CLIENT_HOST = "client_host";
Expand All @@ -46,53 +34,11 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {

public static final String TOPIC = "__consumer_offsets";

public static String name() {
return "__consumer_offsets";
}

private static JsonMapper createMapper() {
var module = new SimpleModule();
module.addSerializer(Struct.class, new JsonSerializer<>() {
@Override
public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeStartObject();
for (BoundField field : value.schema().fields()) {
var fieldVal = value.get(field);
gen.writeObjectField(field.def.name, fieldVal);
}
gen.writeEndObject();
}
});
var mapper = new JsonMapper();
mapper.registerModule(module);
return mapper;
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}

@Override
public boolean canDeserialize(String topic, Target type) {
return topic.equals(TOPIC);
}

@Override
public boolean canSerialize(String topic, Target type) {
return false;
}

@Override
public Serializer serializer(String topic, Target type) {
throw new UnsupportedOperationException();
}

@Override
public Deserializer deserializer(String topic, Target type) {
return switch (type) {
Expand Down Expand Up @@ -304,8 +250,5 @@ private Deserializer valueDeserializer() {
};
}

@SneakyThrows
private String toJson(Struct s) {
return JSON_MAPPER.writeValueAsString(s);
}

}
17 changes: 1 addition & 16 deletions api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@

import io.kafbat.ui.serde.api.DeserializeResult;
import io.kafbat.ui.serde.api.PropertyResolver;
import io.kafbat.ui.serde.api.SchemaDescription;
import io.kafbat.ui.serdes.BuiltInSerde;
import java.util.HexFormat;
import java.util.Map;
import java.util.Optional;

public class HexSerde implements BuiltInSerde {
public static final String NAME = "Hex";

private HexFormat deserializeHexFormat;

public static String name() {
return "Hex";
}

@Override
public void autoConfigure(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
configure(" ", true);
Expand All @@ -37,16 +32,6 @@ private void configure(String delim, boolean uppercase) {
}
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
return Optional.empty();
}

@Override
public boolean canDeserialize(String topic, Target type) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,7 @@
import java.util.Optional;

public class Int32Serde implements BuiltInSerde {

public static String name() {
return "Int32";
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}
public static final String NAME = "Int32";

@Override
public Optional<SchemaDescription> getSchema(String topic, Target type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,7 @@
import java.util.Optional;

public class Int64Serde implements BuiltInSerde {

public static String name() {
return "Int64";
}

@Override
public Optional<String> getDescription() {
return Optional.empty();
}
public static final String NAME = "Int64";

@Override
public Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
Expand Down
Loading
Loading