Skip to content

Commit e21f6fb

Browse files
authored
BE: Serde: Impl MM2 serdes (#1381)
1 parent 205b9e8 commit e21f6fb

31 files changed

+593
-296
lines changed

api/src/main/java/io/kafbat/ui/serdes/BuiltInSerde.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package io.kafbat.ui.serdes;
22

33
import io.kafbat.ui.serde.api.PropertyResolver;
4+
import io.kafbat.ui.serde.api.SchemaDescription;
45
import io.kafbat.ui.serde.api.Serde;
6+
import java.util.Optional;
57

68
public interface BuiltInSerde extends Serde {
79

@@ -24,4 +26,25 @@ default void configure(PropertyResolver serdeProperties,
2426
PropertyResolver kafkaClusterProperties,
2527
PropertyResolver globalProperties) {
2628
}
29+
30+
@Override
31+
default boolean canSerialize(String topic, Serde.Target type) {
32+
return false;
33+
}
34+
35+
@Override
36+
default Serde.Serializer serializer(String topic, Serde.Target type) {
37+
throw new UnsupportedOperationException();
38+
}
39+
40+
@Override
41+
default Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
42+
return Optional.empty();
43+
}
44+
45+
@Override
46+
default Optional<String> getDescription() {
47+
return Optional.empty();
48+
}
49+
2750
}

api/src/main/java/io/kafbat/ui/serdes/ClusterSerdes.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ public Stream<SerdeInstance> all() {
6464

6565
public SerdeInstance suggestSerdeForSerialize(String topic, Serde.Target type) {
6666
return findSerdeByPatternsOrDefault(topic, type, s -> s.canSerialize(topic, type))
67-
.orElse(serdes.get(StringSerde.name()));
67+
.orElse(serdes.get(StringSerde.NAME));
6868
}
6969

7070
public SerdeInstance suggestSerdeForDeserialize(String topic, Serde.Target type) {
7171
return findSerdeByPatternsOrDefault(topic, type, s -> s.canDeserialize(topic, type))
72-
.orElse(serdes.get(StringSerde.name()));
72+
.orElse(serdes.get(StringSerde.NAME));
7373
}
7474

7575
@Override

api/src/main/java/io/kafbat/ui/serdes/SerdesInitializer.java

Lines changed: 43 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import io.kafbat.ui.serdes.builtin.UInt32Serde;
2121
import io.kafbat.ui.serdes.builtin.UInt64Serde;
2222
import io.kafbat.ui.serdes.builtin.UuidBinarySerde;
23+
import io.kafbat.ui.serdes.builtin.mm2.CheckpointSerde;
24+
import io.kafbat.ui.serdes.builtin.mm2.HeartbeatSerde;
25+
import io.kafbat.ui.serdes.builtin.mm2.OffsetSyncSerde;
2326
import io.kafbat.ui.serdes.builtin.sr.SchemaRegistrySerde;
2427
import java.util.LinkedHashMap;
2528
import java.util.Map;
@@ -39,18 +42,23 @@ public class SerdesInitializer {
3942
public SerdesInitializer() {
4043
this(
4144
ImmutableMap.<String, Class<? extends BuiltInSerde>>builder()
42-
.put(StringSerde.name(), StringSerde.class)
43-
.put(SchemaRegistrySerde.name(), SchemaRegistrySerde.class)
44-
.put(ProtobufFileSerde.name(), ProtobufFileSerde.class)
45-
.put(Int32Serde.name(), Int32Serde.class)
46-
.put(Int64Serde.name(), Int64Serde.class)
47-
.put(UInt32Serde.name(), UInt32Serde.class)
48-
.put(UInt64Serde.name(), UInt64Serde.class)
49-
.put(AvroEmbeddedSerde.name(), AvroEmbeddedSerde.class)
50-
.put(Base64Serde.name(), Base64Serde.class)
51-
.put(HexSerde.name(), HexSerde.class)
52-
.put(UuidBinarySerde.name(), UuidBinarySerde.class)
53-
.put(ProtobufRawSerde.name(), ProtobufRawSerde.class)
45+
.put(StringSerde.NAME, StringSerde.class)
46+
.put(SchemaRegistrySerde.NAME, SchemaRegistrySerde.class)
47+
.put(ProtobufFileSerde.NAME, ProtobufFileSerde.class)
48+
.put(Int32Serde.NAME, Int32Serde.class)
49+
.put(Int64Serde.NAME, Int64Serde.class)
50+
.put(UInt32Serde.NAME, UInt32Serde.class)
51+
.put(UInt64Serde.NAME, UInt64Serde.class)
52+
.put(AvroEmbeddedSerde.NAME, AvroEmbeddedSerde.class)
53+
.put(Base64Serde.NAME, Base64Serde.class)
54+
.put(HexSerde.NAME, HexSerde.class)
55+
.put(UuidBinarySerde.NAME, UuidBinarySerde.class)
56+
.put(ProtobufRawSerde.NAME, ProtobufRawSerde.class)
57+
58+
// mm2 serdes
59+
.put(HeartbeatSerde.NAME, HeartbeatSerde.class)
60+
.put(OffsetSyncSerde.NAME, OffsetSyncSerde.class)
61+
.put(CheckpointSerde.NAME, CheckpointSerde.class)
5462
.build(),
5563
new CustomSerdeLoader()
5664
);
@@ -131,8 +139,8 @@ public ClusterSerdes init(Environment env,
131139
.orElse(null),
132140
Optional.ofNullable(clusterProperties.getDefaultValueSerde())
133141
.map(name -> Preconditions.checkNotNull(registeredSerdes.get(name), "Default value serde not found"))
134-
.or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.name())))
135-
.or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.name())))
142+
.or(() -> Optional.ofNullable(registeredSerdes.get(SchemaRegistrySerde.NAME)))
143+
.or(() -> Optional.ofNullable(registeredSerdes.get(ProtobufFileSerde.NAME)))
136144
.orElse(null),
137145
createFallbackSerde()
138146
);
@@ -142,15 +150,16 @@ public ClusterSerdes init(Environment env,
142150
* Registers serdse that should only be used for specific (hard-coded) topics, like ConsumerOffsetsSerde.
143151
*/
144152
private void registerTopicRelatedSerde(Map<String, SerdeInstance> serdes) {
145-
registerConsumerOffsetsSerde(serdes);
153+
serdes.putAll(consumerOffsetsSerde());
154+
serdes.putAll(mirrorMakerSerdes());
146155
}
147156

148-
private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
157+
private Map<String, SerdeInstance> consumerOffsetsSerde() {
149158
var pattern = Pattern.compile(ConsumerOffsetsSerde.TOPIC);
150-
serdes.put(
151-
ConsumerOffsetsSerde.name(),
159+
return Map.of(
160+
ConsumerOffsetsSerde.NAME,
152161
new SerdeInstance(
153-
ConsumerOffsetsSerde.name(),
162+
ConsumerOffsetsSerde.NAME,
154163
new ConsumerOffsetsSerde(),
155164
pattern,
156165
pattern,
@@ -159,6 +168,21 @@ private void registerConsumerOffsetsSerde(Map<String, SerdeInstance> serdes) {
159168
);
160169
}
161170

171+
private Map<String, SerdeInstance> mirrorMakerSerdes() {
172+
return Map.of(
173+
HeartbeatSerde.NAME,
174+
mirrorSerde(HeartbeatSerde.NAME, HeartbeatSerde.TOPIC_NAME_PATTERN, new HeartbeatSerde()),
175+
OffsetSyncSerde.NAME,
176+
mirrorSerde(OffsetSyncSerde.NAME, OffsetSyncSerde.TOPIC_NAME_PATTERN, new OffsetSyncSerde()),
177+
CheckpointSerde.NAME,
178+
mirrorSerde(CheckpointSerde.NAME, CheckpointSerde.TOPIC_NAME_PATTERN, new CheckpointSerde())
179+
);
180+
}
181+
182+
private SerdeInstance mirrorSerde(String name, Pattern pattern, BuiltInSerde serde) {
183+
return new SerdeInstance(name, serde, pattern, pattern, null);
184+
}
185+
162186
private SerdeInstance createFallbackSerde() {
163187
StringSerde serde = new StringSerde();
164188
serde.configure(PropertyResolverImpl.empty(), PropertyResolverImpl.empty(), PropertyResolverImpl.empty());

api/src/main/java/io/kafbat/ui/serdes/builtin/AvroEmbeddedSerde.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,31 +13,13 @@
1313
import org.apache.avro.generic.GenericDatumReader;
1414

1515
public class AvroEmbeddedSerde implements BuiltInSerde {
16-
17-
public static String name() {
18-
return "Avro (Embedded)";
19-
}
20-
21-
@Override
22-
public Optional<String> getDescription() {
23-
return Optional.empty();
24-
}
25-
26-
@Override
27-
public Optional<SchemaDescription> getSchema(String topic, Target type) {
28-
return Optional.empty();
29-
}
16+
public static final String NAME = "Avro (Embedded)";
3017

3118
@Override
3219
public boolean canDeserialize(String topic, Target type) {
3320
return true;
3421
}
3522

36-
@Override
37-
public boolean canSerialize(String topic, Target type) {
38-
return false;
39-
}
40-
4123
@Override
4224
public Serializer serializer(String topic, Target type) {
4325
throw new IllegalStateException();

api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,13 @@
11
package io.kafbat.ui.serdes.builtin;
22

33
import io.kafbat.ui.serde.api.DeserializeResult;
4-
import io.kafbat.ui.serde.api.SchemaDescription;
54
import io.kafbat.ui.serde.api.Serde;
65
import io.kafbat.ui.serdes.BuiltInSerde;
76
import java.util.Base64;
87
import java.util.Map;
9-
import java.util.Optional;
108

119
public class Base64Serde implements BuiltInSerde {
12-
13-
public static String name() {
14-
return "Base64";
15-
}
16-
17-
@Override
18-
public Optional<String> getDescription() {
19-
return Optional.empty();
20-
}
21-
22-
@Override
23-
public Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {
24-
return Optional.empty();
25-
}
10+
public static final String NAME = "Base64";
2611

2712
@Override
2813
public boolean canDeserialize(String topic, Serde.Target type) {

api/src/main/java/io/kafbat/ui/serdes/builtin/ConsumerOffsetsSerde.java

Lines changed: 3 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,19 @@
11
package io.kafbat.ui.serdes.builtin;
22

3-
import com.fasterxml.jackson.core.JsonGenerator;
4-
import com.fasterxml.jackson.databind.JsonSerializer;
5-
import com.fasterxml.jackson.databind.SerializerProvider;
6-
import com.fasterxml.jackson.databind.json.JsonMapper;
7-
import com.fasterxml.jackson.databind.module.SimpleModule;
83
import io.kafbat.ui.serde.api.DeserializeResult;
9-
import io.kafbat.ui.serde.api.SchemaDescription;
104
import io.kafbat.ui.serdes.BuiltInSerde;
11-
import java.io.IOException;
125
import java.nio.ByteBuffer;
136
import java.util.Map;
14-
import java.util.Optional;
15-
import lombok.SneakyThrows;
167
import org.apache.kafka.common.protocol.types.ArrayOf;
17-
import org.apache.kafka.common.protocol.types.BoundField;
188
import org.apache.kafka.common.protocol.types.CompactArrayOf;
199
import org.apache.kafka.common.protocol.types.Field;
2010
import org.apache.kafka.common.protocol.types.Schema;
21-
import org.apache.kafka.common.protocol.types.Struct;
2211
import org.apache.kafka.common.protocol.types.Type;
2312

2413
// Deserialization logic and message's schemas can be found in
2514
// kafka.coordinator.group.GroupMetadataManager (readMessageKey, readOffsetMessageValue, readGroupMessageValue)
26-
public class ConsumerOffsetsSerde implements BuiltInSerde {
27-
28-
private static final JsonMapper JSON_MAPPER = createMapper();
15+
public class ConsumerOffsetsSerde extends StructSerde implements BuiltInSerde {
16+
public static final String NAME = "__consumer_offsets";
2917

3018
private static final String ASSIGNMENT = "assignment";
3119
private static final String CLIENT_HOST = "client_host";
@@ -46,53 +34,11 @@ public class ConsumerOffsetsSerde implements BuiltInSerde {
4634

4735
public static final String TOPIC = "__consumer_offsets";
4836

49-
public static String name() {
50-
return "__consumer_offsets";
51-
}
52-
53-
private static JsonMapper createMapper() {
54-
var module = new SimpleModule();
55-
module.addSerializer(Struct.class, new JsonSerializer<>() {
56-
@Override
57-
public void serialize(Struct value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
58-
gen.writeStartObject();
59-
for (BoundField field : value.schema().fields()) {
60-
var fieldVal = value.get(field);
61-
gen.writeObjectField(field.def.name, fieldVal);
62-
}
63-
gen.writeEndObject();
64-
}
65-
});
66-
var mapper = new JsonMapper();
67-
mapper.registerModule(module);
68-
return mapper;
69-
}
70-
71-
@Override
72-
public Optional<String> getDescription() {
73-
return Optional.empty();
74-
}
75-
76-
@Override
77-
public Optional<SchemaDescription> getSchema(String topic, Target type) {
78-
return Optional.empty();
79-
}
80-
8137
@Override
8238
public boolean canDeserialize(String topic, Target type) {
8339
return topic.equals(TOPIC);
8440
}
8541

86-
@Override
87-
public boolean canSerialize(String topic, Target type) {
88-
return false;
89-
}
90-
91-
@Override
92-
public Serializer serializer(String topic, Target type) {
93-
throw new UnsupportedOperationException();
94-
}
95-
9642
@Override
9743
public Deserializer deserializer(String topic, Target type) {
9844
return switch (type) {
@@ -304,8 +250,5 @@ private Deserializer valueDeserializer() {
304250
};
305251
}
306252

307-
@SneakyThrows
308-
private String toJson(Struct s) {
309-
return JSON_MAPPER.writeValueAsString(s);
310-
}
253+
311254
}

api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,15 @@
22

33
import io.kafbat.ui.serde.api.DeserializeResult;
44
import io.kafbat.ui.serde.api.PropertyResolver;
5-
import io.kafbat.ui.serde.api.SchemaDescription;
65
import io.kafbat.ui.serdes.BuiltInSerde;
76
import java.util.HexFormat;
87
import java.util.Map;
9-
import java.util.Optional;
108

119
public class HexSerde implements BuiltInSerde {
10+
public static final String NAME = "Hex";
1211

1312
private HexFormat deserializeHexFormat;
1413

15-
public static String name() {
16-
return "Hex";
17-
}
18-
1914
@Override
2015
public void autoConfigure(PropertyResolver kafkaClusterProperties, PropertyResolver globalProperties) {
2116
configure(" ", true);
@@ -37,16 +32,6 @@ private void configure(String delim, boolean uppercase) {
3732
}
3833
}
3934

40-
@Override
41-
public Optional<String> getDescription() {
42-
return Optional.empty();
43-
}
44-
45-
@Override
46-
public Optional<SchemaDescription> getSchema(String topic, Target type) {
47-
return Optional.empty();
48-
}
49-
5035
@Override
5136
public boolean canDeserialize(String topic, Target type) {
5237
return true;

api/src/main/java/io/kafbat/ui/serdes/builtin/Int32Serde.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,7 @@
88
import java.util.Optional;
99

1010
public class Int32Serde implements BuiltInSerde {
11-
12-
public static String name() {
13-
return "Int32";
14-
}
15-
16-
@Override
17-
public Optional<String> getDescription() {
18-
return Optional.empty();
19-
}
11+
public static final String NAME = "Int32";
2012

2113
@Override
2214
public Optional<SchemaDescription> getSchema(String topic, Target type) {

api/src/main/java/io/kafbat/ui/serdes/builtin/Int64Serde.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,7 @@
99
import java.util.Optional;
1010

1111
public class Int64Serde implements BuiltInSerde {
12-
13-
public static String name() {
14-
return "Int64";
15-
}
16-
17-
@Override
18-
public Optional<String> getDescription() {
19-
return Optional.empty();
20-
}
12+
public static final String NAME = "Int64";
2113

2214
@Override
2315
public Optional<SchemaDescription> getSchema(String topic, Serde.Target type) {

0 commit comments

Comments
 (0)