diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 0efe5e827..e449b95c4 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -12,6 +12,7 @@ import io.kafbat.ui.exception.IllegalEntityStateException; import io.kafbat.ui.exception.NotFoundException; import io.kafbat.ui.exception.ValidationException; +import io.kafbat.ui.service.validators.KafkaPropertiesConstraintsValidator; import io.kafbat.ui.util.KafkaVersion; import io.kafbat.ui.util.MetadataVersion; import io.kafbat.ui.util.annotation.KafkaClientInternalsDependant; @@ -476,6 +477,9 @@ public Mono createTopic(String name, int numPartitions, @Nullable Integer replicationFactor, Map configs) { + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + configs); + validator.validate(); var newTopic = new NewTopic( name, Optional.of(numPartitions), diff --git a/api/src/main/java/io/kafbat/ui/service/validators/KafkaPropertiesConstraintsValidator.java b/api/src/main/java/io/kafbat/ui/service/validators/KafkaPropertiesConstraintsValidator.java new file mode 100644 index 000000000..43b100093 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/service/validators/KafkaPropertiesConstraintsValidator.java @@ -0,0 +1,217 @@ +package io.kafbat.ui.service.validators; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +@AllArgsConstructor +public class KafkaPropertiesConstraintsValidator { + private Integer replicationFactor; + private Map configs; + + private static final String COMPACT = "compact"; + private static final String LOCAL_RETENTION_MS = "local.retention.ms"; + private static final String LOCAL_RETENTION_BYTES = "local.retention.bytes"; + private static final String RETENTION_MS = "retention.ms"; + private static final String SEGMENT_MS = "segment.ms"; + private static final String RETENTION_BYTES = "retention.bytes"; + private static final String SEGMENT_BYTES = "segment.bytes"; + private static final String MAX_MESSAGE_BYTES = "max.message.bytes"; + private static final String COMPRESSION_ZSTD_LEVEL = "compression.zstd.level"; + private static final String COMPRESSION_LZ4_LEVEL = "compression.lz4.level"; + private static final String COMPRESSION_GZIP_LEVEL = "compression.gzip.level"; + private static final String MIN_CLEANABLE_DIRTY_RATIO = "min.cleanable.dirty.ratio"; + private static final String MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms"; + private static final String MAX_COMPACTION_LAG_MS = "max.compaction.lag.ms"; + private static final String DELETE_RETENTION_MS = "delete.retention.ms"; + + public void validate() { + minInSyncReplicasLessThanReplicationFactorValidation(); + compressionConfigValueValidation(); + compactionConfigValuesValidation(); + remoteStorageConfigValuesValidation(); + retentionAndDeletionTimeConfigurationBasedConstraintsValidation(); + retentionAndDeletionMemoryConfigurationBasedConstraintsValidation(); + } + + void minInSyncReplicasLessThanReplicationFactorValidation() { + Integer minInSyncReplicas = configs.get("min.insync.replicas") != null + ? Integer.parseInt(configs.get("min.insync.replicas")) + : null; + + if (minInSyncReplicas != null && replicationFactor != null && minInSyncReplicas > replicationFactor) { + throw new IllegalArgumentException( + String.format("min.insync.replicas (%d) should be less than or equal to replication.factor (%d)", + minInSyncReplicas, replicationFactor)); + } + } + + void compressionConfigValueValidation() { + String compressionType = configs.get("compression.type"); + if (configs.get(COMPRESSION_ZSTD_LEVEL) != null && !Objects.equals(configs.get(COMPRESSION_ZSTD_LEVEL), "3") + && !"zstd".equals(compressionType)) { + throw new IllegalArgumentException( + String.format("compression.zstd.level (%s) should be set only when compression.type is zstd", + configs.get(COMPRESSION_ZSTD_LEVEL))); + } + if (configs.get(COMPRESSION_LZ4_LEVEL) != null && !Objects.equals(configs.get(COMPRESSION_LZ4_LEVEL), "9") + && !"lz4".equals(compressionType)) { + throw new IllegalArgumentException( + String.format("compression.lz4.level (%s) should be set only when compression.type is lz4", + configs.get(COMPRESSION_LZ4_LEVEL))); + } + if (configs.get(COMPRESSION_GZIP_LEVEL) != null && !Objects.equals(configs.get(COMPRESSION_GZIP_LEVEL), "-1") + && !"gzip".equals(compressionType)) { + throw new IllegalArgumentException( + String.format("compression.gzip.level (%s) should be set only when compression.type is gzip", + configs.get(COMPRESSION_GZIP_LEVEL))); + } + } + + void compactionConfigValuesValidation() { + String cleanupPolicy = configs.get("cleanup.policy"); + List policies = new ArrayList<>(); + if (cleanupPolicy != null) { + policies = Arrays.asList(cleanupPolicy.split(",")); + } + if (configs.get(MIN_CLEANABLE_DIRTY_RATIO) != null + && !Objects.equals(configs.get(MIN_CLEANABLE_DIRTY_RATIO), "0.5") + && !policies.contains(COMPACT)) { + throw new IllegalArgumentException( + String.format("min.cleanable.dirty.ratio (%s) should be set only when cleanup.policy is compact", + configs.get(MIN_CLEANABLE_DIRTY_RATIO))); + } + if (configs.get(MIN_COMPACTION_LAG_MS) != null + && !Objects.equals(configs.get(MIN_COMPACTION_LAG_MS), "0") + && !policies.contains(COMPACT)) { + throw new IllegalArgumentException( + String.format("min.compaction.lag.ms (%s) should be set only when cleanup.policy is compact", + configs.get(MIN_COMPACTION_LAG_MS))); + } + if (configs.get(MAX_COMPACTION_LAG_MS) != null + && !Objects.equals(configs.get(MAX_COMPACTION_LAG_MS), "9223372036854775807") + && !policies.contains(COMPACT)) { + throw new IllegalArgumentException( + String.format("max.compaction.lag.ms (%s) should be set only when cleanup.policy is compact", + configs.get(MAX_COMPACTION_LAG_MS))); + } + if (configs.get(DELETE_RETENTION_MS) != null + && !Objects.equals(configs.get(DELETE_RETENTION_MS), "86400000") + && !policies.contains(COMPACT)) { + throw new IllegalArgumentException( + String.format("delete.retention.ms (%s) should be set only when cleanup.policy is compact", + configs.get(DELETE_RETENTION_MS))); + } + + } + + void remoteStorageConfigValuesValidation() { + String remoteStorageEnabled = configs.get("remote.storage.enable"); + if (configs.get(LOCAL_RETENTION_MS) != null && !Objects.equals(configs.get(LOCAL_RETENTION_MS), "-2") + && !"true".equals(remoteStorageEnabled)) { + throw new IllegalArgumentException( + String.format("local.retention.ms (%s) should be set only when remoteStorageEnabled is true", + configs.get(LOCAL_RETENTION_MS))); + } + if (configs.get(LOCAL_RETENTION_BYTES) != null && !Objects.equals(configs.get(LOCAL_RETENTION_BYTES), "-2") + && !"true".equals(remoteStorageEnabled)) { + throw new IllegalArgumentException( + String.format("local.retention.bytes (%s) should be set only when remoteStorageEnabled is true", + configs.get(LOCAL_RETENTION_BYTES))); + } + } + + void retentionAndDeletionTimeConfigurationBasedConstraintsValidation() { + List keys = new ArrayList<>(); + List values = new ArrayList<>(); + if (!Objects.equals(configs.get(RETENTION_MS), "-1")) { + keys.add(RETENTION_MS); + addRetentionMsToValues(values); + } + if (!Objects.equals(configs.get(LOCAL_RETENTION_MS), "-2") && configs.get(LOCAL_RETENTION_MS) != null) { + keys.add(LOCAL_RETENTION_MS); + values.add(parseLong(configs.get(LOCAL_RETENTION_MS))); + } + keys.add(SEGMENT_MS); + if (configs.get(SEGMENT_MS) != null) { + values.add(parseLong(configs.get(SEGMENT_MS))); + } else { + values.add(604800000L); + } + + for (int i = 0; i < values.size() - 1; i++) { + Long current = values.get(i); + Long next = values.get(i + 1); + if (current != 0 && next != 0 && current < next) { + throw new IllegalArgumentException( + String.format("Invalid configuration: %s (%s) should be greater than or equal to %s (%s)", + keys.get(i), current, + keys.get(i + 1), next)); + } + } + + } + + void addRetentionMsToValues(List values) { + if (configs.get(RETENTION_MS) != null) { + values.add(parseLong(configs.get(RETENTION_MS))); + } else { + values.add(604800000L); + } + } + + void retentionAndDeletionMemoryConfigurationBasedConstraintsValidation() { + + List keys = new ArrayList<>(); + List values = new ArrayList<>(); + if (!Objects.equals(configs.get(RETENTION_BYTES), "-1") && configs.get(RETENTION_BYTES) != null) { + keys.add(RETENTION_BYTES); + values.add(parseLong(configs.get(RETENTION_BYTES))); + } + if (!Objects.equals(configs.get(LOCAL_RETENTION_BYTES), "-2") && configs.get(LOCAL_RETENTION_BYTES) != null) { + keys.add(LOCAL_RETENTION_BYTES); + values.add(parseLong(configs.get(LOCAL_RETENTION_BYTES))); + } + keys.add(SEGMENT_BYTES); + if (configs.get(SEGMENT_BYTES) != null) { + values.add(parseLong(configs.get(SEGMENT_BYTES))); + } else { + values.add(1073741824L); + } + keys.add(MAX_MESSAGE_BYTES); + if (configs.get(MAX_MESSAGE_BYTES) != null) { + values.add(parseLong(configs.get(MAX_MESSAGE_BYTES))); + } else { + values.add(1048588L); + } + + for (int i = 0; i < values.size() - 1; i++) { + Long current = values.get(i); + Long next = values.get(i + 1); + if (current != 0 && next != 0 && current < next) { + + throw new IllegalArgumentException( + String.format("Invalid configuration: %s (%s) should be greater than or equal to %s (%s)", + keys.get(i), current, + keys.get(i + 1), next + )); + } + } + + } + + private static long parseLong(String value) { + try { + return value != null ? Long.parseLong(value) : 0L; + } catch (NumberFormatException e) { + return 0L; + } + } + +} diff --git a/api/src/test/java/io/kafbat/ui/service/validators/KafkaPropertiesConstraintsValidatorTest.java b/api/src/test/java/io/kafbat/ui/service/validators/KafkaPropertiesConstraintsValidatorTest.java new file mode 100644 index 000000000..95768dcff --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/service/validators/KafkaPropertiesConstraintsValidatorTest.java @@ -0,0 +1,913 @@ +package io.kafbat.ui.service.validators; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class KafkaPropertiesConstraintsValidatorTest { + + @Nested + class ValidateTest { + @Test + void shouldNotThrowForValidConfigs() { + Map config = new HashMap<>(); + config.put("min.insync.replicas", "2"); + config.put("compression.type", "gzip"); + config.put("compression.gzip.level", "7"); + config.put("cleanup.policy", "compact"); + config.put("min.cleanable.dirty.ratio", "0.7"); + config.put("min.compaction.lag.ms", "1000"); + config.put("max.compaction.lag.ms", "6000"); + config.put("delete.retention.ms", "30000"); + config.put("retention.ms", "704800000"); + config.put("local.retention.ms", "704800000"); + config.put("segment.ms", "86400000"); + config.put("retention.bytes", "1073741824"); + config.put("local.retention.bytes", "1073741824"); + config.put("segment.bytes", "10485760"); + config.put("max.message.bytes", "1048576"); + config.put("leader.replication.throttled.replicas", "replica1"); + config.put("remote.storage.enable", "true"); + config.put("message.downconversion.enable", "true"); + config.put("segment.jitter.ms", "1000"); + config.put("flush.ms", "5000"); + config.put("follower.replication.throttled.replicas", "replica2"); + config.put("flush.messages", "1000"); + config.put("message.format.version", "2.8-IV0"); + config.put("file.delete.delay.ms", "60000"); + config.put("message.timestamp.type", "CreateTime"); + config.put("preallocate", "false"); + config.put("index.interval.bytes", "4096"); + config.put("unclean.leader.election.enable", "true"); + config.put("message.timestamp.after.max.ms", "8223372036854775807"); + config.put("message.timestamp.before.max.ms", "9222372036854775807"); + config.put("message.timestamp.difference.max.ms", "7223372036854775807"); + config.put("segment.index.bytes", "9485760"); + + int replicationFactor = 3; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + assertDoesNotThrow(validator::validate); + } + + @Test + void shouldNotThrowForDefaultConfigValues() { + Map config = new HashMap<>(); + config.put("min.insync.replicas", "1"); + config.put("compression.type", "producer"); + config.put("compression.gzip.level", "-1"); + config.put("compression.lz4.level", "9"); + config.put("compression.zstd.level", "3"); + config.put("cleanup.policy", "delete"); + config.put("min.cleanable.dirty.ratio", "0.5"); + config.put("min.compaction.lag.ms", "0"); + config.put("max.compaction.lag.ms", "9223372036854775807"); + config.put("delete.retention.ms", "86400000"); + config.put("retention.ms", "604800000"); + config.put("local.retention.ms", "-2"); + config.put("segment.ms", "604800000"); + config.put("retention.bytes", "-1"); + config.put("local.retention.bytes", "-2"); + config.put("segment.bytes", "1073741824"); + config.put("max.message.bytes", "1048588"); + config.put("leader.replication.throttled.replicas", ""); + config.put("remote.storage.enable", "false"); + config.put("message.downconversion.enable", "true"); + config.put("segment.jitter.ms", "0"); + config.put("flush.ms", "9223372036854775807"); + config.put("follower.replication.throttled.replicas", ""); + config.put("flush.messages", "9223372036854775807"); + config.put("message.format.version", "3.0-IV1"); + config.put("file.delete.delay.ms", "60000"); + config.put("message.timestamp.type", "CreateTime"); + config.put("preallocate", "false"); + config.put("index.interval.bytes", "4096"); + config.put("unclean.leader.election.enable", "false"); + config.put("message.timestamp.after.max.ms", "9223372036854775807"); + config.put("message.timestamp.before.max.ms", "9223372036854775807"); + config.put("message.timestamp.difference.max.ms", "9223372036854775807"); + config.put("segment.index.bytes", "10485760"); + + int replicationFactor = 1; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + assertDoesNotThrow(validator::validate); + } + + @Test + void shouldThrowForInValidConfigs() { + Map config = new HashMap<>(); + config.put("min.insync.replicas", "2"); + config.put("compression.type", "gzip"); + config.put("compression.gzip.level", "3"); + config.put("cleanup.policy", "compact"); + config.put("min.cleanable.dirty.ratio", "0.5"); + config.put("min.compaction.lag.ms", "1000"); + config.put("max.compaction.lag.ms", "60000"); + config.put("delete.retention.ms", "30000"); + config.put("retention.ms", "604800000"); + config.put("local.retention.ms", "1296400000"); + config.put("segment.ms", "86400000"); + config.put("retention.bytes", "1073741824"); + config.put("local.retention.bytes", "1073741824"); + config.put("segment.bytes", "10485760"); + config.put("max.message.bytes", "1048576"); + config.put("leader.replication.throttled.replicas", "replica1"); + config.put("remote.storage.enable", "true"); + config.put("message.downconversion.enable", "true"); + config.put("segment.jitter.ms", "1000"); + config.put("flush.ms", "5000"); + config.put("follower.replication.throttled.replicas", "replica2"); + config.put("flush.messages", "1000"); + config.put("message.format.version", "2.8-IV0"); + config.put("file.delete.delay.ms", "60000"); + config.put("message.timestamp.type", "CreateTime"); + config.put("preallocate", "false"); + config.put("index.interval.bytes", "4096"); + config.put("unclean.leader.election.enable", "true"); + config.put("message.timestamp.after.max.ms", "9223372036854775807"); + config.put("message.timestamp.before.max.ms", "9223372036854775807"); + config.put("message.timestamp.difference.max.ms", "9223372036854775807"); + config.put("segment.index.bytes", "10485760"); + + int replicationFactor = 3; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::validate + ); + assertEquals("Invalid configuration: retention.ms (604800000) should be greater than or equal " + + "to local.retention.ms (1296400000)", + exception.getMessage()); + } + + @Test + void shouldNotThrowForPartialConfigs() { + Map config = new HashMap<>(); + config.put("min.insync.replicas", "2"); + config.put("compression.type", "gzip"); + config.put("compression.gzip.level", "3"); + config.put("cleanup.policy", "compact"); + config.put("min.cleanable.dirty.ratio", "0.5"); + config.put("min.compaction.lag.ms", "1000"); + config.put("max.compaction.lag.ms", "60000"); + config.put("delete.retention.ms", "30000"); + config.put("retention.ms", "604800000"); + config.put("segment.ms", "86400000"); + config.put("retention.bytes", "1073741824"); + config.put("local.retention.bytes", "1073741824"); + config.put("segment.bytes", "10485760"); + config.put("max.message.bytes", "1048576"); + config.put("leader.replication.throttled.replicas", "replica1"); + config.put("remote.storage.enable", "true"); + config.put("message.downconversion.enable", "true"); + config.put("segment.jitter.ms", "1000"); + config.put("flush.ms", "5000"); + config.put("follower.replication.throttled.replicas", "replica2"); + config.put("flush.messages", "1000"); + config.put("message.format.version", "2.8-IV0"); + config.put("file.delete.delay.ms", "60000"); + config.put("message.timestamp.type", "CreateTime"); + config.put("preallocate", "false"); + config.put("index.interval.bytes", "4096"); + config.put("unclean.leader.election.enable", "true"); + + + int replicationFactor = 3; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + assertDoesNotThrow(validator::validate); + } + + @Test + void shouldThrowForMinInSyncReplicasGreaterThanReplicationFactor() { + Map config = Map.of("min.insync.replicas", "4"); + int replicationFactor = 3; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::validate + ); + assertEquals("min.insync.replicas (4) should be less than or equal to replication.factor (3)", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidCompressionConfigValues() { + Map config = Map.of("compression.type", "gzip", + "compression.gzip.level", "5", + "compression.zstd.level", "4"); + int replicationFactor = 1; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::validate + ); + assertEquals("compression.zstd.level (4) should be set only when compression.type is zstd", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidCompactionConfigValues() { + Map config = Map.of("cleanup.policy", "delete", + "min.cleanable.dirty.ratio", "0.7"); + int replicationFactor = 1; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::validate + ); + assertEquals("min.cleanable.dirty.ratio (0.7) should be set only when cleanup.policy is compact", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidRemoteStorageConfigValues() { + Map config = Map.of("local.retention.ms", "604800000", + "local.retention.bytes", "1073741824"); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, config); + + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::validate + ); + assertEquals("local.retention.ms (604800000) should be set only when remoteStorageEnabled is true", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidRetentionAndDeletionTimeConfigs() { + Map config = Map.of("remote.storage.enable", "true", + "retention.ms", "604800000", + "local.retention.ms", "1209600000", + "segment.ms", "86400000"); + int replicationFactor = 1; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::validate + ); + assertEquals("Invalid configuration: retention.ms (604800000) " + + "should be greater than or equal to local.retention.ms (1209600000)", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidRetentionAndDeletionMemoryConfigs() { + Map config = Map.of("remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "2147483648", + "segment.bytes", "10485760", + "max.message.bytes", "1048576"); + int replicationFactor = 1; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::validate + ); + assertEquals("Invalid configuration: retention.bytes (1073741824) " + + "should be greater than or equal to local.retention.bytes (2147483648)", + exception.getMessage()); + } + } + + @Nested + class MinInSyncReplicasValidationTest { + @Test + void shouldNotThrowForValidMinInSyncReplicas() { + Map config = Map.of("min.insync.replicas", "2"); + int replicationFactor = 3; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + assertDoesNotThrow(validator::minInSyncReplicasLessThanReplicationFactorValidation); + } + + @Test + void shouldThrowForMinInSyncReplicasGreaterThanReplicationFactor() { + Map config = Map.of("min.insync.replicas", "4"); + int replicationFactor = 3; + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(replicationFactor, + config); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::minInSyncReplicasLessThanReplicationFactorValidation + ); + assertEquals("min.insync.replicas (4) should be less than or equal to replication.factor (3)", + exception.getMessage()); + } + + } + + @Nested + class CompressionConfigValueValidationTest { + @Test + void shouldNotThrowForValidCompressionTypeAndConfig() { + Map configs = Map.of("compression.gzip.level", "5", + "compression.type", "gzip"); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::compressionConfigValueValidation); + } + + @Test + void shouldThrowForNullCompressionType() { + Map configs = Map.of("compression.gzip.level", "5"); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::compressionConfigValueValidation + ); + assertEquals("compression.gzip.level (5) should be set only when compression.type is gzip", + exception.getMessage()); + } + + @Test + void shouldThrowForCompressionZstdLevelAndNonZstdCompressionType() { + Map configs = Map.of("compression.zstd.level", "4", + "compression.type", "gzip"); + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::compressionConfigValueValidation + ); + assertEquals("compression.zstd.level (4) should be set only when compression.type is zstd", + exception.getMessage()); + } + + @Test + void shouldNotThrowForDefaultCompressionZstdLevelAndNonZstdCompressionType() { + Map configs = Map.of("compression.zstd.level", "3", + "compression.type", "gzip"); + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + + assertDoesNotThrow(validator::compressionConfigValueValidation); + } + + + @Test + void shouldThrowForCompressionLz4LevelAndNonLz4CompressionType() { + Map configs = Map.of("compression.lz4.level", "3", + "compression.type", "gzip"); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::compressionConfigValueValidation + ); + assertEquals("compression.lz4.level (3) should be set only when compression.type is lz4", + exception.getMessage()); + } + + @Test + void shouldNotThrowForDefaultCompressionLz4LevelAndNonNullLz4CompressionType() { + Map configs = Map.of("compression.lz4.level", "9", + "compression.type", "gzip"); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::compressionConfigValueValidation); + } + + @Test + void shouldThrowForCompressionGzipLevelAndNonGzipCompressionType() { + Map configs = Map.of("compression.gzip.level", "5", + "compression.type", "zstd"); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::compressionConfigValueValidation + ); + assertEquals("compression.gzip.level (5) should be set only when compression.type is gzip", + exception.getMessage()); + } + + @Test + void shouldThrowForDefaultCompressionGzipLevelAndNonGzipCompressionType() { + Map configs = Map.of("compression.gzip.level", "-1", + "compression.type", "zstd"); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::compressionConfigValueValidation); + } + } + + @Nested + class CompactionConfigValuesValidationTest { + @Test + void shouldNotThrowForNullConfigAndPolicies() { + Map configs = new HashMap<>(); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::compactionConfigValuesValidation); + } + + @Test + void shouldNotThrowForNonNullConfigAndPolicies() { + Map config = Map.of( + "min.cleanable.dirty.ratio", "0.7", + "min.compaction.lag.ms", "1000", + "max.compaction.lag.ms", "60000", + "delete.retention.ms", "30000", + "cleanup.policy", "compact" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, config); + assertDoesNotThrow(validator::compactionConfigValuesValidation); + } + + @ParameterizedTest + @MethodSource("nonDefaultCompactionArgs") + void shouldThrowForDeletePolicyWhenCompactionOnlySettingsPresent(String key, String value, String expectedMsg) { + Map config = Map.of( + key, value, + "cleanup.policy", "delete" + ); + + var validator = new KafkaPropertiesConstraintsValidator(1, config); + + IllegalArgumentException ex = assertThrows( + IllegalArgumentException.class, + validator::compactionConfigValuesValidation + ); + assertEquals(expectedMsg, ex.getMessage()); + } + + static Stream nonDefaultCompactionArgs() { + return Stream.of( + Arguments.of( + "min.cleanable.dirty.ratio", "0.7", + "min.cleanable.dirty.ratio (0.7) should be set only when cleanup.policy is compact" + ), + Arguments.of( + "min.compaction.lag.ms", "1000", + "min.compaction.lag.ms (1000) should be set only when cleanup.policy is compact" + ), + Arguments.of( + "max.compaction.lag.ms", "6000", + "max.compaction.lag.ms (6000) should be set only when cleanup.policy is compact" + ), + Arguments.of( + "delete.retention.ms", "6000", + "delete.retention.ms (6000) should be set only when cleanup.policy is compact" + ) + ); + } + + @ParameterizedTest + @MethodSource("defaultCompactionArgs") + void shouldNotThrowForDeletePolicyWhenDefaultCompactionOnlySettingsPresent(String key, String value) { + Map config = Map.of( + key, value, + "cleanup.policy", "delete" + ); + + var validator = new KafkaPropertiesConstraintsValidator(1, config); + + assertDoesNotThrow(validator::compactionConfigValuesValidation); + } + + static Stream defaultCompactionArgs() { + return Stream.of( + Arguments.of( + "min.cleanable.dirty.ratio", "0.5" + ), + Arguments.of( + "min.compaction.lag.ms", "0" + ), + Arguments.of( + "max.compaction.lag.ms", "9223372036854775807" + ), + Arguments.of( + "delete.retention.ms", "86400000" + ) + ); + } + + @Test + void shouldHandleCompactDeletePolicyCorrectly() { + Map config = Map.of( + "delete.retention.ms", "30000", + "cleanup.policy", "delete,compact" + ); + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, config); + assertDoesNotThrow(validator::compactionConfigValuesValidation); + } + + } + + @Nested + class RemoteStorageConfigValuesValidationTest { + @Test + void shouldNotThrowForNonNullLocalRetentionValuesAndRemoteStorageEnabled() { + Map configs = Map.of( + "remote.storage.enable", "true", + "local.retention.ms", "604800000", + "local.retention.bytes", "1073741824" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::remoteStorageConfigValuesValidation); + } + + @Test + void shouldNotThrowForNullLocalRetentionValuesAndRemoteStorageDisabled() { + Map configs = new HashMap<>(); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::remoteStorageConfigValuesValidation); + } + + @Test + void shouldNotThrowForDefaultLocalRetentionValuesAndRemoteStorageEnabled() { + Map configs = Map.of( + "remote.storage.enable", "true", + "local.retention.ms", "-2", + "local.retention.bytes", "-2" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::remoteStorageConfigValuesValidation); + } + + @Test + void shouldThrowForNonNullLocalRetentionMsAndRemoteStorageDisabled() { + Map configs = Map.of( + "local.retention.ms", "604800000", + "local.retention.bytes", "1073741824" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::remoteStorageConfigValuesValidation + ); + assertEquals("local.retention.ms (604800000) should be set only when remoteStorageEnabled is true", + exception.getMessage()); + } + + @Test + void shouldThrowForNonNullLocalRetentionBytesAndRemoteStorageDisabled() { + Map configs = Map.of( + "local.retention.bytes", "1073741824" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::remoteStorageConfigValuesValidation + ); + assertEquals("local.retention.bytes (1073741824) should be set only when remoteStorageEnabled is true", + exception.getMessage()); + } + } + + @Nested + class RetentionAndDeletionTimeConfigurationBasedConstraintsValidationTest { + + @Test + void shouldNotThrowForValidRetentionAndDeletionTimes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.ms", "704800000", + "local.retention.ms", "703800000", + "segment.ms", "86400000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation); + } + + @Test + void shouldThrowForInvalidRetentionAndDeletionTimesForRetentionMsLessThanLocalRetentionMs() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.ms", "604800000", + "local.retention.ms", "1209600000", + "segment.ms", "86400000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: retention.ms (604800000) " + + "should be greater than or equal to local.retention.ms (1209600000)", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidRetentionAndDeletionTimesForLocalRetentionMsLessThanSegmentMs() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.ms", "604800000", + "local.retention.ms", "604800000", + "segment.ms", "1209600000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: local.retention.ms (604800000) " + + "should be greater than or equal to segment.ms (1209600000)", + exception.getMessage()); + } + + @Test + void shouldNotThrowForDefaultNullRetentionMs() { + Map configs = Map.of( + "remote.storage.enable", "true", + "local.retention.ms", "304800000", + "segment.ms", "86400000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation); + } + + @Test + void shouldThrowForDefaultNullRetentionMs() { + Map configs = Map.of( + "remote.storage.enable", "true", + "local.retention.ms", "704800000", + "segment.ms", "86400000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: retention.ms (604800000) " + + "should be greater than or equal to local.retention.ms (704800000)", + exception.getMessage()); + } + + @Test + void shouldNotThrowForDefaultNullLocalRetentionMs() { + Map configs = Map.of( + "retention.ms", "1209600000", + "segment.ms", "304800000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation); + } + + @Test + void shouldNotThrowForDefaultNullSegmentMs() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.ms", "1209600000", + "local.retention.ms", "604800000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation); + } + + @Test + void shouldThrowForDefaultNullSegmentMs() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.ms", "1209600000", + "local.retention.ms", "304800000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: local.retention.ms (304800000) " + + "should be greater than or equal to segment.ms (604800000)", + exception.getMessage()); + } + + @Test + void shouldNotThrowForSentinelValues() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.ms", "-1", + "local.retention.ms", "-2", + "segment.ms", "86400000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation); + } + + @Test + void shouldNotThrowForSingleConfigValue() { + Map configs = Map.of( + "retention.ms", "604800000" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionTimeConfigurationBasedConstraintsValidation); + } + + + } + + @Nested + class RetentionAndDeletionMemoryConfigurationBasedConstraintsValidationTest { + @Test + void shouldNotThrowForValidMemoryConfigurations() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "1073741824", + "segment.bytes", "10485760", + "max.message.bytes", "1087576" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + @Test + void shouldThrowForInvalidMemoryConfigurationsForRetentionBytesLessThanLocalRetentionBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "2147483648", + "segment.bytes", "10485760", + "max.message.bytes", "1048576" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: retention.bytes (1073741824) " + + "should be greater than or equal to local.retention.bytes (2147483648)", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidMemoryConfigurationsForLocalRetentionBytesLessThanSegmentBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "1073741824", + "segment.bytes", "2147483648", + "max.message.bytes", "1048576" + ); + + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: local.retention.bytes (1073741824) " + + "should be greater than or equal to segment.bytes (2147483648)", + exception.getMessage()); + } + + @Test + void shouldThrowForInvalidMemoryConfigurationsForMaxMessageBytesLessThanSegmentBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "1073741824", + "segment.bytes", "5242880", + "max.message.bytes", "10485760" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: segment.bytes (5242880) " + + "should be greater than or equal to max.message.bytes (10485760)", + exception.getMessage()); + } + + @Test + void shouldNotThrowForDefaultNullRetentionBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "local.retention.bytes", "1073741824", + "segment.bytes", "10485760", + "max.message.bytes", "1048576" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + @Test + void shouldNotThrowForDefaultNullLocalRetentionBytes() { + Map configs = Map.of( + "retention.bytes", "1073741824", + "segment.bytes", "10485760", + "max.message.bytes", "1048576" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + @Test + void shouldNotThrowForDefaultNullSegmentBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1084741824", + "local.retention.bytes", "1073741824", + "max.message.bytes", "1048576" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + @Test + void shouldThrowForDefaultNullSegmentBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1084741824", + "local.retention.bytes", "1060741824", + "max.message.bytes", "1048576" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: local.retention.bytes (1060741824) " + + "should be greater than or equal to segment.bytes (1073741824)", + exception.getMessage()); + } + + @Test + void shouldNotThrowForDefaultNullMaxMessageBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "1073741824", + "segment.bytes", "10485760" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + @Test + void shouldThrowForDefaultNullMaxMessageBytes() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "1073741824", + "segment.bytes", "1025760" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation + ); + assertEquals("Invalid configuration: segment.bytes (1025760) " + + "should be greater than or equal to max.message.bytes (1048588)", + exception.getMessage()); + } + + @Test + void shouldNotThrowForSentinelValues() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "-1", + "local.retention.bytes", "-2", + "segment.bytes", "10485760", + "max.message.bytes", "1048576" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + @Test + void shouldNotThrowForSingleConfigValue() { + Map configs = Map.of( + "retention.bytes", "1073741824" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + @Test + void shouldNotThrowForDoubleConfigValue() { + Map configs = Map.of( + "remote.storage.enable", "true", + "retention.bytes", "1073741824", + "local.retention.bytes", "1073741824" + ); + KafkaPropertiesConstraintsValidator validator = new KafkaPropertiesConstraintsValidator(1, configs); + assertDoesNotThrow(validator::retentionAndDeletionMemoryConfigurationBasedConstraintsValidation); + } + + } +}