Skip to content

Commit c023039

Browse files
BE: Topics: Validate ISR/replication upon creation (#103)
1 parent 4656684 commit c023039

File tree

3 files changed

+922
-0
lines changed

3 files changed

+922
-0
lines changed

api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.kafbat.ui.exception.IllegalEntityStateException;
1313
import io.kafbat.ui.exception.NotFoundException;
1414
import io.kafbat.ui.exception.ValidationException;
15+
import io.kafbat.ui.service.validators.KafkaPropertiesConstraintsValidator;
1516
import io.kafbat.ui.util.KafkaVersion;
1617
import io.kafbat.ui.util.MetadataVersion;
1718
import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant;
@@ -472,6 +473,8 @@ public Mono<Void> createTopic(String name,
472473
int numPartitions,
473474
@Nullable Integer replicationFactor,
474475
Map<String, String> configs) {
476+
KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, configs);
477+
validator.validate();
475478
var newTopic = new NewTopic(
476479
name,
477480
Optional.of(numPartitions),
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package io.kafbat.ui.service.validators;
2+
3+
import java.util.ArrayList;
4+
import java.util.Arrays;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Objects;
8+
import lombok.AllArgsConstructor;
9+
10+
11+
@AllArgsConstructor
12+
public class KafkaPropertiesConstraintsValidator {
13+
private Integer replicationFactor;
14+
private Map<String, String> configs;
15+
16+
public void validate() {
17+
minInSyncReplicasLessThanReplicationFactorValidation();
18+
compressionConfigValueValidation();
19+
compactionConfigValuesValidation();
20+
remoteStorageConfigValuesValidation();
21+
retentionAndDeletionTimeConfigurationBasedConstraintsValidation();
22+
retentionAndDeletionMemoryConfigurationBasedConstraintsValidation();
23+
}
24+
25+
void minInSyncReplicasLessThanReplicationFactorValidation() {
26+
Integer minInSyncReplicas = configs.get("min.insync.replicas") != null
27+
? Integer.parseInt(configs.get("min.insync.replicas"))
28+
: null;
29+
30+
if (minInSyncReplicas != null && replicationFactor != null && minInSyncReplicas > replicationFactor) {
31+
throw new IllegalArgumentException(
32+
String.format("min.insync.replicas (%d) should be less than or equal to replication.factor (%d)",
33+
minInSyncReplicas, replicationFactor));
34+
}
35+
}
36+
37+
void compressionConfigValueValidation() {
38+
String compressionType = configs.get("compression.type");
39+
if (configs.get("compression.zstd.level") != null && !"zstd".equals(compressionType)) {
40+
throw new IllegalArgumentException(
41+
String.format("compression.zstd.level (%s) should be set only when compression.type is zstd",
42+
configs.get("compression.zstd.level")));
43+
}
44+
if (configs.get("compression.lz4.level") != null && !"lz4".equals(compressionType)) {
45+
throw new IllegalArgumentException(
46+
String.format("compression.lz4.level (%s) should be set only when compression.type is lz4",
47+
configs.get("compression.lz4.level")));
48+
}
49+
if (configs.get("compression.gzip.level") != null && !"gzip".equals(compressionType)) {
50+
throw new IllegalArgumentException(
51+
String.format("compression.gzip.level (%s) should be set only when compression.type is gzip",
52+
configs.get("compression.gzip.level")));
53+
}
54+
}
55+
56+
void compactionConfigValuesValidation() {
57+
String cleanupPolicy = configs.get("cleanup.policy");
58+
List<String> policies = new ArrayList<>();
59+
if (cleanupPolicy != null) {
60+
policies = Arrays.asList(cleanupPolicy.split(","));
61+
}
62+
if (configs.get("min.cleanable.dirty.ratio") != null && !policies.contains("compact")) {
63+
throw new IllegalArgumentException(
64+
String.format("min.cleanable.dirty.ratio (%s) should be set only when cleanup.policy is compact",
65+
configs.get("min.cleanable.dirty.ratio")));
66+
}
67+
if (configs.get("min.compaction.lag.ms") != null && !policies.contains("compact")) {
68+
throw new IllegalArgumentException(
69+
String.format("min.compaction.lag.ms (%s) should be set only when cleanup.policy is compact",
70+
configs.get("min.compaction.lag.ms")));
71+
}
72+
if (configs.get("max.compaction.lag.ms") != null && !policies.contains("compact")) {
73+
throw new IllegalArgumentException(
74+
String.format("max.compaction.lag.ms (%s) should be set only when cleanup.policy is compact",
75+
configs.get("max.compaction.lag.ms")));
76+
}
77+
if (configs.get("delete.retention.ms") != null && !policies.contains("compact")) {
78+
throw new IllegalArgumentException(
79+
String.format("delete.retention.ms (%s) should be set only when cleanup.policy is compact",
80+
configs.get("delete.retention.ms")));
81+
}
82+
83+
}
84+
85+
void remoteStorageConfigValuesValidation() {
86+
String remoteStorageEnabled = configs.get("remote.storage.enable");
87+
if (configs.get("local.retention.ms") != null && !"true".equals(remoteStorageEnabled)) {
88+
throw new IllegalArgumentException(
89+
String.format("local.retention.ms (%s) should be set only when remoteStorageEnabled is true",
90+
configs.get("local.retention.ms")));
91+
}
92+
if (configs.get("local.retention.bytes") != null && !"true".equals(remoteStorageEnabled)) {
93+
throw new IllegalArgumentException(
94+
String.format("local.retention.bytes (%s) should be set only when remoteStorageEnabled is true",
95+
configs.get("local.retention.bytes")));
96+
}
97+
}
98+
99+
void retentionAndDeletionTimeConfigurationBasedConstraintsValidation() {
100+
List<String> keys = new ArrayList<>();
101+
List<Long> values = new ArrayList<>();
102+
if (!Objects.equals(configs.get("retention.ms"), "-1") && configs.get("retention.ms") != null) {
103+
keys.add("retention.ms");
104+
values.add(parseLong(configs.get("retention.ms")));
105+
}
106+
if (!Objects.equals(configs.get("local.retention.ms"), "-2") && configs.get("local.retention.ms") != null) {
107+
keys.add("local.retention.ms");
108+
values.add(parseLong(configs.get("local.retention.ms")));
109+
}
110+
if (configs.get("segment.ms") != null) {
111+
keys.add("segment.ms");
112+
values.add(parseLong(configs.get("segment.ms")));
113+
}
114+
115+
for (int i = 0; i < values.size() - 1; i++) {
116+
Long current = values.get(i);
117+
Long next = values.get(i + 1);
118+
if (current != 0 && next != 0 && current < next) {
119+
throw new IllegalArgumentException(
120+
String.format("Invalid configuration: %s (%s) should be greater than or equal to %s (%s)",
121+
keys.get(i), configs.get(keys.get(i)),
122+
keys.get(i + 1), configs.get(keys.get(i + 1))));
123+
}
124+
}
125+
126+
}
127+
128+
void retentionAndDeletionMemoryConfigurationBasedConstraintsValidation() {
129+
List<String> keys = new ArrayList<>();
130+
List<Long> values = new ArrayList<>();
131+
if (!Objects.equals(configs.get("retention.bytes"), "-1") && configs.get("retention.bytes") != null) {
132+
keys.add("retention.bytes");
133+
values.add(parseLong(configs.get("retention.bytes")));
134+
}
135+
if (!Objects.equals(configs.get("local.retention.bytes"), "-2") && configs.get("local.retention.bytes") != null) {
136+
keys.add("local.retention.bytes");
137+
values.add(parseLong(configs.get("local.retention.bytes")));
138+
}
139+
if (configs.get("segment.bytes") != null) {
140+
keys.add("segment.bytes");
141+
values.add(parseLong(configs.get("segment.bytes")));
142+
}
143+
if (configs.get("max.message.bytes") != null) {
144+
keys.add("max.message.bytes");
145+
values.add(parseLong(configs.get("max.message.bytes")));
146+
}
147+
148+
for (int i = 0; i < values.size() - 1; i++) {
149+
Long current = values.get(i);
150+
Long next = values.get(i + 1);
151+
if (current != 0 && next != 0 && current < next) {
152+
throw new IllegalArgumentException(
153+
String.format("Invalid configuration: %s (%s) should be greater than or equal to %s (%s)",
154+
keys.get(i), configs.get(keys.get(i)),
155+
keys.get(i + 1), configs.get(keys.get(i + 1)
156+
)));
157+
}
158+
}
159+
160+
}
161+
162+
private static long parseLong(String value) {
163+
try {
164+
return value != null ? Long.parseLong(value) : 0L;
165+
} catch (NumberFormatException e) {
166+
return 0L;
167+
}
168+
}
169+
170+
}

0 commit comments

Comments
 (0)