From e9d5a7d653bbbd94c9407219802a0b467626cb5f Mon Sep 17 00:00:00 2001 From: Daniel Quiralte Ramos Date: Thu, 26 Jun 2025 16:20:13 +0200 Subject: [PATCH 1/6] Add all changes to connect with GCP Schema Registries --- .../kafbat/ui/config/ClustersProperties.java | 3 ++ .../serdes/builtin/sr/MessageFormatter.java | 32 +++++++++------ .../builtin/sr/SchemaRegistrySerde.java | 22 +++++++++-- .../ui/service/KafkaClusterFactory.java | 2 + .../ui/service/SchemaRegistryService.java | 11 +++++- .../kafbat/ui/util/WebClientConfigurator.java | 39 +++++++++++++++++++ .../builtin/sr/SchemaRegistrySerdeTest.java | 6 +-- .../main/resources/swagger/kafbat-ui-api.yaml | 2 + .../main/resources/swagger/kafka-sr-api.yaml | 9 ++++- 9 files changed, 103 insertions(+), 23 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 23be86369..733e66cb7 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -70,6 +70,9 @@ public static class Cluster { List<@Valid Masking> masking; AuditProperties audit; + + boolean gcpSchemaRegistry = false; + } @Data diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java index 5ab77ffeb..6d6d0eecd 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java @@ -11,6 +11,7 @@ import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.kafbat.ui.util.jsonschema.JsonAvroConversion; +import java.util.HashMap; import java.util.Map; import lombok.SneakyThrows; @@ -18,9 +19,10 @@ interface MessageFormatter { String format(String topic, byte[] value); - static Map createMap(SchemaRegistryClient schemaRegistryClient) { + static Map createMap(SchemaRegistryClient schemaRegistryClient, + boolean gcpSchemaRegistry) { return Map.of( - SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient), + SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, gcpSchemaRegistry), SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient), SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) ); @@ -29,17 +31,23 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) class AvroMessageFormatter implements MessageFormatter { private final KafkaAvroDeserializer avroDeserializer; - AvroMessageFormatter(SchemaRegistryClient client) { + AvroMessageFormatter(SchemaRegistryClient client, boolean gcpSchemaRegistry) { this.avroDeserializer = new KafkaAvroDeserializer(client); - this.avroDeserializer.configure( - Map.of( - AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused", - KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false, - KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false, - KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true - ), - false - ); + + final Map avroProps = new HashMap<>(); + avroProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused"); + avroProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); + avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false); + avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true); + + if (gcpSchemaRegistry) { + avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM"); + avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, + "class com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"); + } + + this.avroDeserializer.configure(avroProps, false); + } @Override diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index d6f7a3699..9357848c0 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -42,6 +42,9 @@ public class SchemaRegistrySerde implements BuiltInSerde { private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0; private static final int SR_PAYLOAD_PREFIX_LENGTH = 5; + private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM"; + private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = + "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"; public static String name() { return "SchemaRegistry"; @@ -80,8 +83,10 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties, kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null), kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), - kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) + kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false) ), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false), kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"), kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"), kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class) @@ -106,8 +111,10 @@ public void configure(PropertyResolver serdeProperties, serdeProperties.getProperty("keystoreLocation", String.class).orElse(null), serdeProperties.getProperty("keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), - kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) + kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false) ), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false), serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"), serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"), serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class) @@ -119,6 +126,7 @@ public void configure(PropertyResolver serdeProperties, void configure( List schemaRegistryUrls, SchemaRegistryClient schemaRegistryClient, + boolean gcpSchemaRegistry, String keySchemaNameTemplate, String valueSchemaNameTemplate, boolean checkTopicSchemaExistenceForDeserialize) { @@ -126,7 +134,7 @@ void configure( this.schemaRegistryClient = schemaRegistryClient; this.keySchemaNameTemplate = keySchemaNameTemplate; this.valueSchemaNameTemplate = valueSchemaNameTemplate; - this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient); + this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, gcpSchemaRegistry); this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize; } @@ -136,7 +144,8 @@ private static SchemaRegistryClient createSchemaRegistryClient(List urls @Nullable String keyStoreLocation, @Nullable String keyStorePassword, @Nullable String trustStoreLocation, - @Nullable String trustStorePassword) { + @Nullable String trustStorePassword, + boolean gcpSchemaRegistry) { Map configs = new HashMap<>(); if (username != null && password != null) { configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); @@ -166,6 +175,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List urls keyStorePassword); } + if (gcpSchemaRegistry) { + configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE); + configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS); + } + return new CachedSchemaRegistryClient( urls, 1_000, diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index f8c528f90..f764a079f 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -167,11 +167,13 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope } private ReactiveFailover schemaRegistryClient(ClustersProperties.Cluster clusterProperties) { + var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth()) .orElse(new ClustersProperties.SchemaRegistryAuth()); WebClient webClient = new WebClientConfigurator() .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureBasicAuth(auth.getUsername(), auth.getPassword()) + .configureGcpBearerAuth(clusterProperties.isGcpSchemaRegistry()) .configureBufferSize(webClientMaxBuffSize) .build(); return ReactiveFailover.create( diff --git a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java index c725a787e..670bee546 100644 --- a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java +++ b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java @@ -148,14 +148,21 @@ public Mono getSchemaCompatibilityLevel(KafkaCluster cluster, String schemaName) { return api(cluster) .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true)) - .map(CompatibilityConfig::getCompatibilityLevel) + .map(compatibilityConfig -> + cluster.getOriginalProperties().isGcpSchemaRegistry() + ? compatibilityConfig.getCompatibility() + : compatibilityConfig.getCompatibilityLevel()) .onErrorResume(error -> Mono.empty()); } public Mono getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) { return api(cluster) .mono(KafkaSrClientApi::getGlobalCompatibilityLevel) - .map(CompatibilityConfig::getCompatibilityLevel); + .map(compatibilityConfig -> + cluster.getOriginalProperties().isGcpSchemaRegistry() + ? compatibilityConfig.getCompatibility() + : compatibilityConfig.getCompatibilityLevel() + ); } private Mono getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster, diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index 170530be1..c989e0539 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -3,14 +3,17 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.auth.oauth2.GoogleCredentials; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.exception.ValidationException; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.FileInputStream; +import java.io.IOException; import java.security.KeyStore; import java.time.Duration; +import java.util.Collections; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; @@ -24,9 +27,13 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.util.ResourceUtils; import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; + public class WebClientConfigurator { private final WebClient.Builder builder = WebClient.builder(); @@ -45,6 +52,38 @@ private static ObjectMapper defaultOM() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } + public WebClientConfigurator configureGcpBearerAuth(boolean enabled) { + if (enabled) { + System.out.println("Configuring GCP Bearer Auth"); + builder.filter(createGcpBearerAuthFilter()); + } + return this; + } + + private ExchangeFilterFunction createGcpBearerAuthFilter() { + return (request, next) -> { + return Mono.fromCallable(() -> { + try { + // Get credentials using Application Default Credentials (from the GKE service account) + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault() + .createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform")); + + credentials.refreshIfExpired(); + return credentials.getAccessToken().getTokenValue(); + } catch (IOException e) { + throw new RuntimeException("Failed to get GCP access token", e); + } + }) + .flatMap(token -> { + ClientRequest newRequest = ClientRequest.from(request) + // Add the Authorization header + .headers(headers -> headers.setBearerAuth(token)) + .build(); + return next.exchange(newRequest); + }); + }; + } + public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig) { if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) { diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index d66a8d004..7fe37022d 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest { @BeforeEach void init() { serde = new SchemaRegistrySerde(); - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true); } @ParameterizedTest @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", false); + serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", false); } @Test @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true); } @Test diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 8769e6aa1..bb2c8ea97 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -4341,6 +4341,8 @@ components: type: string keystorePassword: type: string + gcpSchemaRegistry: + type: boolean ksqldbServer: type: string ksqldbServerSsl: diff --git a/contract/src/main/resources/swagger/kafka-sr-api.yaml b/contract/src/main/resources/swagger/kafka-sr-api.yaml index 2b082d689..b025b3050 100644 --- a/contract/src/main/resources/swagger/kafka-sr-api.yaml +++ b/contract/src/main/resources/swagger/kafka-sr-api.yaml @@ -381,8 +381,13 @@ components: properties: compatibilityLevel: $ref: '#/components/schemas/Compatibility' - required: - - compatibilityLevel + # GCP Managed Kafka Schema registries specific fields + alias: + type: string + compatibility: + $ref: '#/components/schemas/Compatibility' + normalize: + type: boolean CompatibilityLevelChange: type: object From 5392e77386661eafe400126daffe5481a760cb51 Mon Sep 17 00:00:00 2001 From: Daniel Quiralte Ramos Date: Mon, 7 Jul 2025 17:19:39 +0200 Subject: [PATCH 2/6] Expose BEARER_AUTH_CUSTOM_PROVIDER_CLASS parameter in config --- .../kafbat/ui/config/ClustersProperties.java | 3 +- .../serdes/builtin/sr/MessageFormatter.java | 11 ++++--- .../builtin/sr/SchemaRegistrySerde.java | 31 ++++++++++--------- .../ui/service/KafkaClusterFactory.java | 5 ++- .../ui/service/SchemaRegistryService.java | 28 +++++++++++------ .../kafbat/ui/util/WebClientConfigurator.java | 12 +++++-- .../builtin/sr/SchemaRegistrySerdeTest.java | 6 ++-- .../main/resources/swagger/kafbat-ui-api.yaml | 4 +-- 8 files changed, 61 insertions(+), 39 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 733e66cb7..310c7283d 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -71,8 +71,6 @@ public static class Cluster { AuditProperties audit; - boolean gcpSchemaRegistry = false; - } @Data @@ -116,6 +114,7 @@ public static class ConnectCluster { public static class SchemaRegistryAuth { String username; String password; + String bearerAuthCustomProviderClass; } @Data diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java index 6d6d0eecd..919ea6554 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java @@ -20,9 +20,10 @@ interface MessageFormatter { String format(String topic, byte[] value); static Map createMap(SchemaRegistryClient schemaRegistryClient, - boolean gcpSchemaRegistry) { + //boolean gcpSchemaRegistry) { + String bearerAuthCustomProviderClass) { return Map.of( - SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, gcpSchemaRegistry), + SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, bearerAuthCustomProviderClass), SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient), SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) ); @@ -31,7 +32,7 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) class AvroMessageFormatter implements MessageFormatter { private final KafkaAvroDeserializer avroDeserializer; - AvroMessageFormatter(SchemaRegistryClient client, boolean gcpSchemaRegistry) { + AvroMessageFormatter(SchemaRegistryClient client, String bearerAuthCustomProviderClass) { this.avroDeserializer = new KafkaAvroDeserializer(client); final Map avroProps = new HashMap<>(); @@ -40,10 +41,10 @@ class AvroMessageFormatter implements MessageFormatter { avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false); avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true); - if (gcpSchemaRegistry) { + if (bearerAuthCustomProviderClass != null && !bearerAuthCustomProviderClass.isBlank()) { avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM"); avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, - "class com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"); + String.format("class %s", bearerAuthCustomProviderClass)); } this.avroDeserializer.configure(avroProps, false); diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index 9357848c0..f4697d33b 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -43,8 +43,6 @@ public class SchemaRegistrySerde implements BuiltInSerde { private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0; private static final int SR_PAYLOAD_PREFIX_LENGTH = 5; private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM"; - private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = - "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"; public static String name() { return "SchemaRegistry"; @@ -80,13 +78,15 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties, urls, kafkaClusterProperties.getProperty("schemaRegistryAuth.username", String.class).orElse(null), kafkaClusterProperties.getProperty("schemaRegistryAuth.password", String.class).orElse(null), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null), kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), - kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null), - kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false) + kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) ), - kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"), kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"), kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class) @@ -108,13 +108,15 @@ public void configure(PropertyResolver serdeProperties, urls, serdeProperties.getProperty("username", String.class).orElse(null), serdeProperties.getProperty("password", String.class).orElse(null), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), serdeProperties.getProperty("keystoreLocation", String.class).orElse(null), serdeProperties.getProperty("keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), - kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null), - kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false) + kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) ), - kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false), + kafkaClusterProperties.getProperty("schemaRegistryAuth.bearerAuthCustomProviderClass", String.class) + .orElse(null), serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"), serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"), serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class) @@ -126,7 +128,7 @@ public void configure(PropertyResolver serdeProperties, void configure( List schemaRegistryUrls, SchemaRegistryClient schemaRegistryClient, - boolean gcpSchemaRegistry, + String bearerAuthCustomProviderClass, String keySchemaNameTemplate, String valueSchemaNameTemplate, boolean checkTopicSchemaExistenceForDeserialize) { @@ -134,18 +136,19 @@ void configure( this.schemaRegistryClient = schemaRegistryClient; this.keySchemaNameTemplate = keySchemaNameTemplate; this.valueSchemaNameTemplate = valueSchemaNameTemplate; - this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, gcpSchemaRegistry); + this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, bearerAuthCustomProviderClass); this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize; } private static SchemaRegistryClient createSchemaRegistryClient(List urls, @Nullable String username, @Nullable String password, + @Nullable String bearerAuthCustomProviderClass, @Nullable String keyStoreLocation, @Nullable String keyStorePassword, @Nullable String trustStoreLocation, - @Nullable String trustStorePassword, - boolean gcpSchemaRegistry) { + @Nullable String trustStorePassword + ) { Map configs = new HashMap<>(); if (username != null && password != null) { configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); @@ -175,9 +178,9 @@ private static SchemaRegistryClient createSchemaRegistryClient(List urls keyStorePassword); } - if (gcpSchemaRegistry) { + if (bearerAuthCustomProviderClass != null) { configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE); - configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS); + configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, bearerAuthCustomProviderClass); } return new CachedSchemaRegistryClient( diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index f764a079f..27c900b48 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -168,12 +168,15 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope private ReactiveFailover schemaRegistryClient(ClustersProperties.Cluster clusterProperties) { + + //System.out.println("Creating Schema Registry Client for cluster: " + clusterProperties.getName()); var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth()) .orElse(new ClustersProperties.SchemaRegistryAuth()); + //System.out.println("Auth details: " + auth.toString()); WebClient webClient = new WebClientConfigurator() .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureBasicAuth(auth.getUsername(), auth.getPassword()) - .configureGcpBearerAuth(clusterProperties.isGcpSchemaRegistry()) + .configureBearerTokenAuth(auth.getBearerAuthCustomProviderClass()) .configureBufferSize(webClientMaxBuffSize) .build(); return ReactiveFailover.create( diff --git a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java index 670bee546..b60478a17 100644 --- a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java +++ b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java @@ -15,6 +15,7 @@ import io.kafbat.ui.sr.model.SchemaSubject; import io.kafbat.ui.util.ReactiveFailover; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import lombok.AllArgsConstructor; import lombok.Getter; @@ -34,6 +35,9 @@ public class SchemaRegistryService { private static final String LATEST = "latest"; + private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = + "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"; + @AllArgsConstructor public static class SubjectWithCompatibilityLevel { @Delegate @@ -148,21 +152,14 @@ public Mono getSchemaCompatibilityLevel(KafkaCluster cluster, String schemaName) { return api(cluster) .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true)) - .map(compatibilityConfig -> - cluster.getOriginalProperties().isGcpSchemaRegistry() - ? compatibilityConfig.getCompatibility() - : compatibilityConfig.getCompatibilityLevel()) + .map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig)) .onErrorResume(error -> Mono.empty()); } public Mono getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) { return api(cluster) .mono(KafkaSrClientApi::getGlobalCompatibilityLevel) - .map(compatibilityConfig -> - cluster.getOriginalProperties().isGcpSchemaRegistry() - ? compatibilityConfig.getCompatibility() - : compatibilityConfig.getCompatibilityLevel() - ); + .map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig)); } private Mono getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster, @@ -176,4 +173,17 @@ public Mono checksSchemaCompatibility(KafkaCluster c NewSubject newSchemaSubject) { return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject)); } + + private Compatibility selectCompatibilityFormat(KafkaCluster cluster, CompatibilityConfig compatibilityConfig) { + if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null + && Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(), + GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) { + return compatibilityConfig.getCompatibility(); + } else { + return compatibilityConfig.getCompatibilityLevel(); + } + } } + + + diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index c989e0539..9e53bf6fe 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -14,10 +14,12 @@ import java.security.KeyStore; import java.time.Duration; import java.util.Collections; +import java.util.Objects; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; +import javax.validation.constraints.Null; import lombok.SneakyThrows; import org.openapitools.jackson.nullable.JsonNullableModule; import org.springframework.http.MediaType; @@ -36,6 +38,9 @@ public class WebClientConfigurator { + private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = + "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"; + private final WebClient.Builder builder = WebClient.builder(); private HttpClient httpClient = HttpClient .create() @@ -52,9 +57,10 @@ private static ObjectMapper defaultOM() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } - public WebClientConfigurator configureGcpBearerAuth(boolean enabled) { - if (enabled) { - System.out.println("Configuring GCP Bearer Auth"); + public WebClientConfigurator configureBearerTokenAuth(@Nullable String bearerAuthCustomProviderClass) { + //System.out.println("Configuring GCP Bearer Auth in web client"); + //System.out.println("Bearer Auth Custom Provider Class: " + bearerAuthCustomProviderClass); + if (Objects.equals(bearerAuthCustomProviderClass, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) { builder.filter(createGcpBearerAuthFilter()); } return this; diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index 7fe37022d..9a4be577f 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest { @BeforeEach void init() { serde = new SchemaRegistrySerde(); - serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true); } @ParameterizedTest @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", false); + serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", false); } @Test @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, null, "%s-key", "%s-value", true); } @Test diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index bb2c8ea97..1d92b4eff 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -4334,6 +4334,8 @@ components: type: string password: type: string + bearerAuthCustomProviderClass: + type: string schemaRegistrySsl: type: object properties: @@ -4341,8 +4343,6 @@ components: type: string keystorePassword: type: string - gcpSchemaRegistry: - type: boolean ksqldbServer: type: string ksqldbServerSsl: From 07261140f034f0aea572f927298785d75b2b8cda Mon Sep 17 00:00:00 2001 From: Daniel Quiralte Ramos Date: Tue, 8 Jul 2025 12:12:41 +0200 Subject: [PATCH 3/6] Clean code --- .../main/java/io/kafbat/ui/service/KafkaClusterFactory.java | 5 +---- .../main/java/io/kafbat/ui/util/WebClientConfigurator.java | 3 --- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index 27c900b48..4df19e282 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -167,12 +167,9 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope } private ReactiveFailover schemaRegistryClient(ClustersProperties.Cluster clusterProperties) { - - - //System.out.println("Creating Schema Registry Client for cluster: " + clusterProperties.getName()); + var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth()) .orElse(new ClustersProperties.SchemaRegistryAuth()); - //System.out.println("Auth details: " + auth.toString()); WebClient webClient = new WebClientConfigurator() .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureBasicAuth(auth.getUsername(), auth.getPassword()) diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index 9e53bf6fe..8e3dbb6de 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -19,7 +19,6 @@ import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.TrustManagerFactory; -import javax.validation.constraints.Null; import lombok.SneakyThrows; import org.openapitools.jackson.nullable.JsonNullableModule; import org.springframework.http.MediaType; @@ -58,8 +57,6 @@ private static ObjectMapper defaultOM() { } public WebClientConfigurator configureBearerTokenAuth(@Nullable String bearerAuthCustomProviderClass) { - //System.out.println("Configuring GCP Bearer Auth in web client"); - //System.out.println("Bearer Auth Custom Provider Class: " + bearerAuthCustomProviderClass); if (Objects.equals(bearerAuthCustomProviderClass, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) { builder.filter(createGcpBearerAuthFilter()); } From aaa04b6bce77a94b28e06dcfee1faff8ccd71adc Mon Sep 17 00:00:00 2001 From: Daniel Quiralte Ramos Date: Tue, 8 Jul 2025 12:15:31 +0200 Subject: [PATCH 4/6] Clean code --- .../java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java index 919ea6554..c6e5a1d72 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java @@ -20,7 +20,6 @@ interface MessageFormatter { String format(String topic, byte[] value); static Map createMap(SchemaRegistryClient schemaRegistryClient, - //boolean gcpSchemaRegistry) { String bearerAuthCustomProviderClass) { return Map.of( SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, bearerAuthCustomProviderClass), From 6ba0bc8d2c332ff2c251b987535fea5737c2f436 Mon Sep 17 00:00:00 2001 From: Daniel Quiralte Ramos Date: Tue, 8 Jul 2025 18:06:52 +0200 Subject: [PATCH 5/6] Move gcp tokens logic out of WebClientConfigurator --- .../ui/service/KafkaClusterFactory.java | 18 +++++--- .../kafbat/ui/util/WebClientConfigurator.java | 44 ++++--------------- .../ui/util/gcp/GcpBearerAuthFilter.java | 40 +++++++++++++++++ 3 files changed, 61 insertions(+), 41 deletions(-) create mode 100644 api/src/main/java/io/kafbat/ui/util/gcp/GcpBearerAuthFilter.java diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index 4df19e282..12fc6a4ba 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -16,10 +16,12 @@ import io.kafbat.ui.util.KafkaServicesValidation; import io.kafbat.ui.util.ReactiveFailover; import io.kafbat.ui.util.WebClientConfigurator; +import io.kafbat.ui.util.gcp.GcpBearerAuthFilter; import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Properties; import java.util.stream.Stream; @@ -167,15 +169,21 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope } private ReactiveFailover schemaRegistryClient(ClustersProperties.Cluster clusterProperties) { - + var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth()) .orElse(new ClustersProperties.SchemaRegistryAuth()); - WebClient webClient = new WebClientConfigurator() + WebClientConfigurator webClientConfigurator = new WebClientConfigurator() .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureBasicAuth(auth.getUsername(), auth.getPassword()) - .configureBearerTokenAuth(auth.getBearerAuthCustomProviderClass()) - .configureBufferSize(webClientMaxBuffSize) - .build(); + .configureBufferSize(webClientMaxBuffSize); + + if (auth.getBearerAuthCustomProviderClass() != null && Objects.equals(auth.getBearerAuthCustomProviderClass(), + GcpBearerAuthFilter.getGcpBearerAuthCustomProviderClass())) { + webClientConfigurator.filter(new GcpBearerAuthFilter()); + } + + WebClient webClient = webClientConfigurator.build(); + return ReactiveFailover.create( parseUrlList(clusterProperties.getSchemaRegistry()), url -> new KafkaSrClientApi(new ApiClient(webClient, null, null).setBasePath(url)), diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index 8e3dbb6de..c4405e89f 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -3,18 +3,16 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.google.auth.oauth2.GoogleCredentials; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.exception.ValidationException; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.FileInputStream; -import java.io.IOException; import java.security.KeyStore; import java.time.Duration; -import java.util.Collections; -import java.util.Objects; +import java.util.ArrayList; +import java.util.List; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; @@ -28,23 +26,20 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.util.ResourceUtils; import org.springframework.util.unit.DataSize; -import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; -import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; public class WebClientConfigurator { - private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = - "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"; - private final WebClient.Builder builder = WebClient.builder(); private HttpClient httpClient = HttpClient .create() .proxyWithSystemProperties(); + private final List filters = new ArrayList<>(); + public WebClientConfigurator() { configureObjectMapper(defaultOM()); } @@ -56,37 +51,13 @@ private static ObjectMapper defaultOM() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } - public WebClientConfigurator configureBearerTokenAuth(@Nullable String bearerAuthCustomProviderClass) { - if (Objects.equals(bearerAuthCustomProviderClass, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) { - builder.filter(createGcpBearerAuthFilter()); + public WebClientConfigurator filter(ExchangeFilterFunction filter) { + if (filter != null) { + this.filters.add(filter); } return this; } - private ExchangeFilterFunction createGcpBearerAuthFilter() { - return (request, next) -> { - return Mono.fromCallable(() -> { - try { - // Get credentials using Application Default Credentials (from the GKE service account) - GoogleCredentials credentials = GoogleCredentials.getApplicationDefault() - .createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform")); - - credentials.refreshIfExpired(); - return credentials.getAccessToken().getTokenValue(); - } catch (IOException e) { - throw new RuntimeException("Failed to get GCP access token", e); - } - }) - .flatMap(token -> { - ClientRequest newRequest = ClientRequest.from(request) - // Add the Authorization header - .headers(headers -> headers.setBearerAuth(token)) - .build(); - return next.exchange(newRequest); - }); - }; - } - public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig) { if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) { @@ -193,6 +164,7 @@ public WebClientConfigurator configureResponseTimeout(Duration responseTimeout) } public WebClient build() { + builder.filters(filterList -> filterList.addAll(this.filters)); return builder.clientConnector(new ReactorClientHttpConnector(httpClient)).build(); } } diff --git a/api/src/main/java/io/kafbat/ui/util/gcp/GcpBearerAuthFilter.java b/api/src/main/java/io/kafbat/ui/util/gcp/GcpBearerAuthFilter.java new file mode 100644 index 000000000..3faae1d76 --- /dev/null +++ b/api/src/main/java/io/kafbat/ui/util/gcp/GcpBearerAuthFilter.java @@ -0,0 +1,40 @@ +package io.kafbat.ui.util.gcp; + +import com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider; +import org.jetbrains.annotations.NotNull; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; +import org.springframework.web.reactive.function.client.ExchangeFunction; +import reactor.core.publisher.Mono; + +public class GcpBearerAuthFilter implements ExchangeFilterFunction { + + private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = + GcpBearerAuthCredentialProvider.class.getName(); + + private final GcpBearerAuthCredentialProvider credentialProvider; + + public GcpBearerAuthFilter() { + this.credentialProvider = new GcpBearerAuthCredentialProvider(); + } + + @NotNull + @Override + public Mono filter(ClientRequest request, ExchangeFunction next) { + // This Mono ensures token fetching happens for EACH request + return Mono.fromCallable(() -> this.credentialProvider.getBearerToken(null)) + .flatMap(token -> { + // Create a new request with the Authorization header + ClientRequest newRequest = ClientRequest.from(request) + .headers(headers -> headers.setBearerAuth(token)) + .build(); + // Pass the new request to the next filter in the chain + return next.exchange(newRequest); + }); + } + + public static String getGcpBearerAuthCustomProviderClass() { + return GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS; + } +} From f2e8a53ac753dbf2265905c5c5a48928a13f032b Mon Sep 17 00:00:00 2001 From: Daniel Quiralte Ramos Date: Wed, 9 Jul 2025 18:30:13 +0200 Subject: [PATCH 6/6] Add new schema compatibilityConfigGCP --- .../ui/service/SchemaRegistryService.java | 21 ++++++++-------- .../main/resources/swagger/kafka-sr-api.yaml | 25 +++++++++++++------ 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java index b60478a17..01426f07f 100644 --- a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java +++ b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java @@ -9,11 +9,12 @@ import io.kafbat.ui.sr.api.KafkaSrClientApi; import io.kafbat.ui.sr.model.Compatibility; import io.kafbat.ui.sr.model.CompatibilityCheckResponse; -import io.kafbat.ui.sr.model.CompatibilityConfig; import io.kafbat.ui.sr.model.CompatibilityLevelChange; +import io.kafbat.ui.sr.model.GetGlobalCompatibilityLevel200Response; import io.kafbat.ui.sr.model.NewSubject; import io.kafbat.ui.sr.model.SchemaSubject; import io.kafbat.ui.util.ReactiveFailover; +import io.kafbat.ui.util.gcp.GcpBearerAuthFilter; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -35,9 +36,6 @@ public class SchemaRegistryService { private static final String LATEST = "latest"; - private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = - "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"; - @AllArgsConstructor public static class SubjectWithCompatibilityLevel { @Delegate @@ -152,14 +150,16 @@ public Mono getSchemaCompatibilityLevel(KafkaCluster cluster, String schemaName) { return api(cluster) .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true)) - .map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig)) + .map(compatibilityResponse -> + normalizeCompatibilityResponse(cluster, compatibilityResponse)) .onErrorResume(error -> Mono.empty()); } public Mono getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) { return api(cluster) .mono(KafkaSrClientApi::getGlobalCompatibilityLevel) - .map(compatibilityConfig -> selectCompatibilityFormat(cluster, compatibilityConfig)); + .map(compatibilityResponse -> + normalizeCompatibilityResponse(cluster, compatibilityResponse)); } private Mono getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster, @@ -174,13 +174,14 @@ public Mono checksSchemaCompatibility(KafkaCluster c return api(cluster).mono(c -> c.checkSchemaCompatibility(schemaName, LATEST, true, newSchemaSubject)); } - private Compatibility selectCompatibilityFormat(KafkaCluster cluster, CompatibilityConfig compatibilityConfig) { + private Compatibility normalizeCompatibilityResponse(KafkaCluster cluster, + GetGlobalCompatibilityLevel200Response compatibilityResponse) { if (cluster.getOriginalProperties().getSchemaRegistryAuth() != null && Objects.equals(cluster.getOriginalProperties().getSchemaRegistryAuth().getBearerAuthCustomProviderClass(), - GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS)) { - return compatibilityConfig.getCompatibility(); + GcpBearerAuthFilter.getGcpBearerAuthCustomProviderClass())) { + return compatibilityResponse.getCompatibility(); } else { - return compatibilityConfig.getCompatibilityLevel(); + return compatibilityResponse.getCompatibilityLevel(); } } } diff --git a/contract/src/main/resources/swagger/kafka-sr-api.yaml b/contract/src/main/resources/swagger/kafka-sr-api.yaml index b025b3050..c2b888e2f 100644 --- a/contract/src/main/resources/swagger/kafka-sr-api.yaml +++ b/contract/src/main/resources/swagger/kafka-sr-api.yaml @@ -176,7 +176,9 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/CompatibilityConfig' + oneOf: + - $ref: '#/components/schemas/CompatibilityConfig' + - $ref: '#/components/schemas/CompatibilityConfigGcp' 404: description: Not found put: @@ -220,7 +222,9 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/CompatibilityConfig' + oneOf: + - $ref: '#/components/schemas/CompatibilityConfig' + - $ref: '#/components/schemas/CompatibilityConfigGcp' 404: description: Not found put: @@ -381,13 +385,20 @@ components: properties: compatibilityLevel: $ref: '#/components/schemas/Compatibility' - # GCP Managed Kafka Schema registries specific fields + required: + - compatibilityLevel + + CompatibilityConfigGcp: + type: object + properties: alias: - type: string + type: string compatibility: - $ref: '#/components/schemas/Compatibility' + $ref: '#/components/schemas/Compatibility' normalize: - type: boolean + type: boolean + required: + - compatibility CompatibilityLevelChange: type: object @@ -397,7 +408,6 @@ components: required: - compatibility - Compatibility: type: string enum: @@ -409,7 +419,6 @@ components: - FULL_TRANSITIVE - NONE - CompatibilityCheckResponse: type: object properties: