diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java index ed73cf8d4337..da485f15cd00 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/AzureEventHubsKafkaAutoConfiguration.java @@ -26,10 +26,35 @@ /** - * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support. + * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support with connection string authentication. + * + *

DEPRECATED: This auto-configuration is deprecated since version 4.3.0. Please migrate to + * {@link AzureEventHubsKafkaOAuth2AutoConfiguration} which provides OAuth2-based authentication using Azure Identity.

+ * + *

Deprecation Notice

+ *

This configuration uses connection string-based authentication (SASL_PLAIN) which is being phased out in favor + * of more secure OAuth2 authentication. The OAuth2 approach provides:

+ * + * + *

Migration Path

+ *

To migrate from connection string to OAuth2:

+ *
{@code
+ * // Old configuration (deprecated)
+ * spring.cloud.azure.eventhubs.connection-string=Endpoint=sb://...
+ *
+ * // New configuration (recommended)
+ * spring.kafka.bootstrap-servers=mynamespace.servicebus.windows.net:9093
+ * spring.cloud.azure.credential.managed-identity-enabled=true
+ * }
* * @since 4.0.0 * @deprecated 4.3.0 in favor of {@link AzureEventHubsKafkaOAuth2AutoConfiguration}. + * @see AzureEventHubsKafkaOAuth2AutoConfiguration */ @Deprecated @Configuration(proxyBeanMethods = false) @@ -40,6 +65,13 @@ public class AzureEventHubsKafkaAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsKafkaAutoConfiguration.class); + /** + * Creates a connection string provider for Event Hubs Kafka when a connection string is configured. + * + * @param environment the Spring environment containing configuration properties + * @return a connection string provider initialized with the Event Hubs connection string + * @throws IllegalArgumentException if the connection string is invalid + */ @Bean @ConditionalOnExpression("'${spring.cloud.azure.eventhubs.connection-string:}' != ''") @ConditionalOnMissingBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class) @@ -56,6 +88,11 @@ StaticConnectionStringProvider eventHubsKafkaConnect return new StaticConnectionStringProvider<>(AzureServiceType.EVENT_HUBS, connectionString); } + /** + * Creates a BeanPostProcessor that configures connection string-based authentication for KafkaProperties beans. + * + * @return the BeanPostProcessor for Kafka properties configuration + */ @Bean @ConditionalOnBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class) static KafkaPropertiesBeanPostProcessor kafkaPropertiesBeanPostProcessor() { diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java index b8e0e0fef93e..924e540e1af0 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/eventhubs/kafka/KafkaPropertiesBeanPostProcessor.java @@ -2,7 +2,7 @@ // Licensed under the MIT License. package com.azure.spring.cloud.autoconfigure.implementation.eventhubs.kafka; -import com.azure.spring.cloud.core.implementation.connectionstring.EventHubsConnectionString; +import com.azure.spring.cloud.autoconfigure.implementation.kafka.ConnectionStringAuthenticationConfigurer; import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider; import com.azure.spring.cloud.core.service.AzureServiceType; import org.slf4j.Logger; @@ -17,49 +17,53 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Map; -import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; -import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; -import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; -import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL; +import com.azure.spring.cloud.core.implementation.connectionstring.EventHubsConnectionString; /** * {@link BeanPostProcessor} for {@link KafkaProperties} to configure connection string credentials. + * + * @deprecated This class is deprecated in favor of OAuth2-based authentication. + * Use {@code AzureEventHubsKafkaOAuth2AutoConfiguration} instead. */ +@Deprecated class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPropertiesBeanPostProcessor.class); - private static final String SASL_CONFIG_VALUE = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";%s"; private ApplicationContext applicationContext; @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { - if (bean instanceof KafkaProperties) { - //TODO(yiliu6): link to OAuth2 reference doc here - LOGGER.warn("Autoconfiguration for Event Hubs for Kafka on connection string/Azure Resource Manager" - + " has been deprecated, please migrate to AzureEventHubsKafkaOAuth2AutoConfiguration for OAuth2 authentication with Azure Identity credentials." - + " To leverage the OAuth2 authentication, you can delete all your Event Hubs for Kafka credential configurations, and configure Kafka bootstrap servers" - + " instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093."); - - KafkaProperties kafkaProperties = (KafkaProperties) bean; - ResolvableType provider = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.EventHubs.class); - ObjectProvider> beanProvider = applicationContext.getBeanProvider(provider); + if (bean instanceof KafkaProperties kafkaProperties) { + ResolvableType provider = ResolvableType.forClassWithGenerics( + ServiceConnectionStringProvider.class, AzureServiceType.EventHubs.class); + ObjectProvider> beanProvider = + applicationContext.getBeanProvider(provider); - ServiceConnectionStringProvider connectionStringProvider = beanProvider.getIfAvailable(); + ServiceConnectionStringProvider connectionStringProvider = + beanProvider.getIfAvailable(); + if (connectionStringProvider == null) { LOGGER.debug("Cannot find a bean of type ServiceConnectionStringProvider, " - + "Spring Cloud Azure will skip performing JAAS enhancements on the KafkaProperties bean."); + + "Spring Cloud Azure will skip performing connection string configuration on the KafkaProperties bean."); return bean; } + // Set bootstrap servers from connection string String connectionString = connectionStringProvider.getConnectionString(); String bootstrapServer = new EventHubsConnectionString(connectionString).getFullyQualifiedNamespace() + ":9093"; kafkaProperties.setBootstrapServers(new ArrayList<>(Collections.singletonList(bootstrapServer))); - kafkaProperties.getProperties().put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name()); - kafkaProperties.getProperties().put(SASL_MECHANISM, "PLAIN"); - kafkaProperties.getProperties().put(SASL_JAAS_CONFIG, String.format(SASL_CONFIG_VALUE, - connectionString, System.getProperty("line.separator"))); + + // Use the ConnectionStringAuthenticationConfigurer to configure authentication + ConnectionStringAuthenticationConfigurer configurer = + new ConnectionStringAuthenticationConfigurer(connectionStringProvider, LOGGER); + + Map mergedProperties = kafkaProperties.buildProducerProperties(null); + if (configurer.canConfigure(mergedProperties)) { + configurer.configure(mergedProperties, kafkaProperties.getProperties()); + } } return bean; } diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaAuthenticationConfigurer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaAuthenticationConfigurer.java new file mode 100644 index 000000000000..b35505b2e55f --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaAuthenticationConfigurer.java @@ -0,0 +1,132 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.spring.cloud.autoconfigure.implementation.kafka; + +import org.slf4j.Logger; +import org.springframework.util.StringUtils; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; + +/** + * Abstract base class for Kafka authentication configurers that provides common functionality + * for checking bootstrap servers and SASL configuration conditions. + *

+ * This class implements the Template Method pattern, where subclasses provide specific + * authentication mechanism implementations while common validation logic is shared. + *

+ */ +abstract class AbstractKafkaAuthenticationConfigurer implements KafkaAuthenticationConfigurer { + + protected final Logger logger; + + protected AbstractKafkaAuthenticationConfigurer(Logger logger) { + this.logger = logger; + } + + @Override + public boolean canConfigure(Map mergedProperties) { + return meetBootstrapServerConditions(mergedProperties) + && meetAuthenticationConditions(mergedProperties); + } + + /** + * Checks if the bootstrap server configuration meets the requirements for Azure Event Hubs. + * The bootstrap server must point to an Event Hubs namespace (*.servicebus.windows.net:9093). + * + * @param sourceProperties the Kafka properties to check + * @return true if bootstrap server configuration is valid for Azure Event Hubs + */ + protected boolean meetBootstrapServerConditions(Map sourceProperties) { + Object bootstrapServers = sourceProperties.get(BOOTSTRAP_SERVERS_CONFIG); + List serverList = extractBootstrapServerList(bootstrapServers); + + if (serverList == null) { + logger.debug("Kafka bootstrap server configuration doesn't meet Azure Event Hubs requirements."); + return false; + } + + return serverList.size() == 1 && serverList.get(0).endsWith(":9093"); + } + + /** + * Extracts bootstrap server list from the configuration value. + * Handles both String and Iterable configurations. + * + * @param bootstrapServers the bootstrap servers configuration value + * @return list of server addresses, or null if invalid format + */ + protected List extractBootstrapServerList(Object bootstrapServers) { + if (bootstrapServers instanceof String) { + return Arrays.asList(StringUtils.delimitedListToStringArray((String) bootstrapServers, ",")); + } else if (bootstrapServers instanceof Iterable) { + List serverList = new java.util.ArrayList<>(); + for (Object obj : (Iterable) bootstrapServers) { + if (obj instanceof String) { + serverList.add((String) obj); + } else { + return null; + } + } + return serverList; + } + return null; + } + + /** + * Checks if the SASL protocol is set to SASL_SSL or not configured. + * + * @param securityProtocol the security protocol configuration value + * @return true if security protocol is compatible + */ + protected boolean meetSaslProtocolConditions(String securityProtocol) { + return securityProtocol == null + || AbstractKafkaPropertiesBeanPostProcessor.SECURITY_PROTOCOL_CONFIG_SASL.equalsIgnoreCase(securityProtocol); + } + + /** + * Template method for subclasses to implement specific authentication mechanism checks. + * This method should verify that the Kafka properties are compatible with the specific + * authentication type (OAuth2, connection string, etc.). + * + * @param sourceProperties the Kafka properties to check + * @return true if this authentication mechanism can be applied + */ + protected abstract boolean meetAuthenticationConditions(Map sourceProperties); + + /** + * Gets the security protocol value from properties. + * + * @param sourceProperties the Kafka properties + * @return the security protocol value or null + */ + protected String getSecurityProtocol(Map sourceProperties) { + return (String) sourceProperties.get(SECURITY_PROTOCOL_CONFIG); + } + + /** + * Gets the SASL mechanism value from properties. + * + * @param sourceProperties the Kafka properties + * @return the SASL mechanism value or null + */ + protected String getSaslMechanism(Map sourceProperties) { + return (String) sourceProperties.get(SASL_MECHANISM); + } + + /** + * Gets the SASL JAAS config value from properties. + * + * @param sourceProperties the Kafka properties + * @return the SASL JAAS config value or null + */ + protected String getJaasConfig(Map sourceProperties) { + return (String) sourceProperties.get(SASL_JAAS_CONFIG); + } +} diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java index 2dfb2f6ab0ce..694f08a79c3f 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AbstractKafkaPropertiesBeanPostProcessor.java @@ -43,6 +43,47 @@ import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM; import static org.springframework.util.StringUtils.delimitedListToStringArray; +/** + * Abstract base class for Kafka properties bean post-processors that configure Azure authentication. + * + *

This class provides a framework for automatically configuring authentication properties for various + * Kafka client types (producers, consumers, and admins). It implements a strategy pattern to allow different + * authentication methods to be plugged in.

+ * + *

Architecture

+ *

The post-processor works in three phases:

+ *
    + *
  1. Detection: Identifies beans that need Kafka authentication configuration
  2. + *
  3. Configuration: Applies authentication settings using a {@link KafkaAuthenticationConfigurer}
  4. + *
  5. Cleanup: Removes Azure-specific properties that shouldn't be passed to Kafka clients
  6. + *
+ * + *

Supported Client Types

+ *

This processor handles authentication for:

+ *
    + *
  • Kafka Producers
  • + *
  • Kafka Consumers
  • + *
  • Kafka Admin Clients
  • + *
+ * + *

Subclass Implementation

+ *

Subclasses must implement methods to:

+ *
    + *
  • Extract merged properties (all configuration sources combined)
  • + *
  • Access raw property maps (for modification)
  • + *
  • Determine which beans need processing
  • + *
+ * + *

Authentication Configuration

+ *

The class uses {@link KafkaAuthenticationConfigurer} instances to apply authentication settings. + * The default implementation uses {@link OAuth2AuthenticationConfigurer} for OAuth2/OAUTHBEARER authentication.

+ * + * @param the type of Kafka properties bean to process + * @see KafkaAuthenticationConfigurer + * @see OAuth2AuthenticationConfigurer + * @see KafkaPropertiesBeanPostProcessor + * @see KafkaBinderConfigurationPropertiesBeanPostProcessor + */ abstract class AbstractKafkaPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractKafkaPropertiesBeanPostProcessor.class); @@ -190,15 +231,25 @@ protected Map invokeBuildKafkaProperties(KafkaProperties kafkaPr * @param rawPropertiesMap the raw Kafka properties Map to configure JAAS to and remove Azure Properties from */ private void replaceAzurePropertiesWithJaas(Map mergedProperties, Map rawPropertiesMap) { - resolveJaasForAzure(mergedProperties) - .ifPresent(jaas -> { - configJaasToKafkaRawProperties(jaas, rawPropertiesMap); - logConfigureOAuthProperties(); - configureKafkaUserAgent(); - }); + // Use strategy pattern to configure authentication + KafkaAuthenticationConfigurer configurer = createAuthenticationConfigurer(); + if (configurer.canConfigure(mergedProperties)) { + configurer.configure(mergedProperties, rawPropertiesMap); + configureKafkaUserAgent(); + } clearAzureProperties(rawPropertiesMap); } + /** + * Creates the appropriate authentication configurer based on available Azure properties. + * Currently supports OAuth2 (OAUTHBEARER) authentication with Azure Identity. + * + * @return the authentication configurer to use + */ + private KafkaAuthenticationConfigurer createAuthenticationConfigurer() { + return new OAuth2AuthenticationConfigurer(azureGlobalProperties, getLogger()); + } + private Optional resolveJaasForAzure(Map mergedProperties) { if (needConfigureSaslOAuth(mergedProperties)) { JaasResolver resolver = new JaasResolver(); diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java index 316017a9d715..9565cb928541 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java @@ -11,18 +11,52 @@ import org.springframework.context.annotation.Configuration; /** - * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka Azure Identity support on Spring Cloud Stream framework. + * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka with Spring Cloud Stream Binder. * - * To trigger the {@link BindingServicePropertiesBeanPostProcessor} when kafka binder is being used, it enables {@link AzureEventHubsKafkaOAuth2AutoConfiguration} - * for Spring Cloud Stream Kafka Binder context which is to support Azure Identity-based OAuth2 authentication. + *

This auto-configuration extends OAuth2 support to Spring Cloud Stream Kafka Binder, enabling passwordless + * authentication for stream-based applications using Azure Event Hubs.

+ * + *

Purpose

+ *

When Spring Cloud Stream Kafka Binder is detected on the classpath, this configuration ensures that + * OAuth2 authentication is properly configured for all Kafka binder instances. It works by:

+ *
    + *
  • Detecting Spring Cloud Stream Kafka Binder on the classpath
  • + *
  • Registering {@link BindingServicePropertiesBeanPostProcessor} to process binder configurations
  • + *
  • Injecting OAuth2 configuration classes into the binder's application context
  • + *
+ * + *

Configuration Requirements

+ *

This auto-configuration activates when:

+ *
    + *
  • {@code spring-cloud-stream-binder-kafka} is on the classpath
  • + *
  • {@code spring.cloud.azure.eventhubs.kafka.enabled} is true (default)
  • + *
+ * + *

Example Configuration

+ *
{@code
+ * spring.kafka.bootstrap-servers=mynamespace.servicebus.windows.net:9093
+ * spring.cloud.azure.credential.managed-identity-enabled=true
+ * spring.cloud.stream.bindings.input.destination=my-event-hub
+ * }
* * @since 4.4.0 + * @see AzureEventHubsKafkaOAuth2AutoConfiguration + * @see BindingServicePropertiesBeanPostProcessor + * @see AzureKafkaSpringCloudStreamConfiguration */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(KafkaBinderConfiguration.class) @ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true) public class AzureEventHubsKafkaBinderOAuth2AutoConfiguration { + /** + * Creates a BeanPostProcessor that configures OAuth2 authentication for Spring Cloud Stream Kafka binders. + * + *

This processor modifies {@link BindingServiceProperties} to inject OAuth2 configuration into + * Kafka binder contexts, ensuring passwordless authentication works seamlessly with Spring Cloud Stream.

+ * + * @return the BeanPostProcessor for binder configuration + */ @Bean static BeanPostProcessor bindingServicePropertiesBeanPostProcessor() { return new BindingServicePropertiesBeanPostProcessor(); diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java index cd86a70cc667..cbf1bb387f02 100644 --- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java @@ -13,16 +13,48 @@ import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME; /** - * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support. Provide Azure Identity-based - * OAuth2 authentication for Event Hubs for Kafka on the basis of Spring Boot Autoconfiguration. + * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support with OAuth2 authentication. + * + *

This auto-configuration provides Azure Identity-based OAuth2 (OAUTHBEARER) authentication for Azure Event Hubs + * for Kafka. It automatically configures Kafka properties to use Azure Active Directory credentials instead of + * connection strings.

+ * + *

Features

+ *
    + *
  • Automatic OAuth2 configuration for Kafka clients
  • + *
  • Support for all Azure Identity credential types (Managed Identity, Service Principal, etc.)
  • + *
  • Works with Spring Boot's standard Kafka configuration
  • + *
  • No need to manually configure SASL/OAUTHBEARER settings
  • + *
+ * + *

Configuration Requirements

+ *

To use this auto-configuration, ensure:

+ *
    + *
  • Kafka client libraries are on the classpath
  • + *
  • Bootstrap servers point to Event Hubs namespace (*.servicebus.windows.net:9093)
  • + *
  • Azure Identity credentials are properly configured
  • + *
+ * + *

Example Configuration

+ *
{@code
+ * spring.kafka.bootstrap-servers=mynamespace.servicebus.windows.net:9093
+ * spring.cloud.azure.credential.managed-identity-enabled=true
+ * }
* * @since 4.3.0 + * @see KafkaPropertiesBeanPostProcessor + * @see OAuth2AuthenticationConfigurer */ @Configuration(proxyBeanMethods = false) @ConditionalOnClass(KafkaTemplate.class) @ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true) public class AzureEventHubsKafkaOAuth2AutoConfiguration { + /** + * Creates a BeanPostProcessor that configures OAuth2 authentication for KafkaProperties beans. + * + * @return the BeanPostProcessor for Kafka properties configuration + */ @Bean(PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME) static BeanPostProcessor kafkaPropertiesBeanPostProcessor() { return new KafkaPropertiesBeanPostProcessor(); diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/ConnectionStringAuthenticationConfigurer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/ConnectionStringAuthenticationConfigurer.java new file mode 100644 index 000000000000..ac7019afe40e --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/ConnectionStringAuthenticationConfigurer.java @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.spring.cloud.autoconfigure.implementation.kafka; + +import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider; +import com.azure.spring.cloud.core.service.AzureServiceType; +import org.slf4j.Logger; + +import java.util.Map; + +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; +import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL; + +/** + * Configures connection string-based (SASL_PLAIN) authentication for Kafka using Event Hubs connection strings. + *

+ * This configurer handles the deprecated connection string authentication method. It extracts the connection + * string from the {@link ServiceConnectionStringProvider} and configures SASL_PLAIN authentication. + *

+ * + * @deprecated This authentication method is deprecated in favor of OAuth2 authentication. + * Use {@link OAuth2AuthenticationConfigurer} instead. + */ +@Deprecated +public class ConnectionStringAuthenticationConfigurer extends AbstractKafkaAuthenticationConfigurer { + + private static final String SASL_MECHANISM_PLAIN = "PLAIN"; + private static final String SASL_JAAS_CONFIG_TEMPLATE = + "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";%s"; + + private final ServiceConnectionStringProvider connectionStringProvider; + + public ConnectionStringAuthenticationConfigurer( + ServiceConnectionStringProvider connectionStringProvider, + Logger logger) { + super(logger); + this.connectionStringProvider = connectionStringProvider; + } + + @Override + protected boolean meetAuthenticationConditions(Map sourceProperties) { + // Connection string authentication requires a connection string provider + if (connectionStringProvider == null) { + return false; + } + + String securityProtocol = getSecurityProtocol(sourceProperties); + String saslMechanism = getSaslMechanism(sourceProperties); + + // Connection string auth works with SASL_SSL protocol and PLAIN mechanism + // or when these are not configured (we'll set them) + boolean protocolMatch = meetSaslProtocolConditions(securityProtocol); + boolean mechanismMatch = saslMechanism == null || SASL_MECHANISM_PLAIN.equalsIgnoreCase(saslMechanism); + + if (protocolMatch && mechanismMatch) { + return true; + } + + logger.debug("Connection string authentication cannot be applied. Security protocol: {}, SASL mechanism: {}", + securityProtocol, saslMechanism); + return false; + } + + @Override + public void configure(Map mergedProperties, Map rawProperties) { + String connectionString = connectionStringProvider.getConnectionString(); + + // Configure SASL_PLAIN authentication with connection string + rawProperties.put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name()); + rawProperties.put(SASL_MECHANISM, SASL_MECHANISM_PLAIN); + rawProperties.put(SASL_JAAS_CONFIG, + String.format(SASL_JAAS_CONFIG_TEMPLATE, connectionString, System.getProperty("line.separator"))); + + logConfiguration(); + } + + private void logConfiguration() { + logger.warn("Autoconfiguration for Event Hubs for Kafka on connection string/Azure Resource Manager" + + " has been deprecated, please migrate to OAuth2 authentication with Azure Identity credentials." + + " To leverage the OAuth2 authentication, you can delete all your Event Hubs for Kafka credential " + + "configurations, and configure Kafka bootstrap servers instead, which can be set as " + + "spring.kafka.bootstrap-servers=EventHubsNamespacesFQDN:9093."); + logger.debug("Connection string authentication property {} will be configured as {}.", + SECURITY_PROTOCOL_CONFIG, SASL_SSL.name()); + logger.debug("Connection string authentication property {} will be configured as {}.", + SASL_MECHANISM, SASL_MECHANISM_PLAIN); + logger.debug("Connection string authentication property {} will be configured (value not logged for security).", + SASL_JAAS_CONFIG); + } +} diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaAuthenticationConfigurer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaAuthenticationConfigurer.java new file mode 100644 index 000000000000..93d1a1c7b2a5 --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaAuthenticationConfigurer.java @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.spring.cloud.autoconfigure.implementation.kafka; + +import java.util.Map; + +/** + * Strategy interface for configuring Kafka authentication properties. + * Implementations handle different authentication mechanisms (OAuth2, connection string, etc.). + */ +interface KafkaAuthenticationConfigurer { + + /** + * Determines if this configurer can handle the given Kafka properties. + * + * @param mergedProperties the merged Kafka properties + * @return true if this configurer can configure authentication for these properties + */ + boolean canConfigure(Map mergedProperties); + + /** + * Configure authentication properties on the raw Kafka properties map. + * + * @param mergedProperties the merged Kafka properties (read-only, used for decision making) + * @param rawProperties the raw Kafka properties map to modify with authentication config + */ + void configure(Map mergedProperties, Map rawProperties); +} diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/OAuth2AuthenticationConfigurer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/OAuth2AuthenticationConfigurer.java new file mode 100644 index 000000000000..6b3ac2f6c8f6 --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/OAuth2AuthenticationConfigurer.java @@ -0,0 +1,129 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.spring.cloud.autoconfigure.implementation.kafka; + +import com.azure.spring.cloud.core.properties.AzureProperties; +import com.azure.spring.cloud.core.implementation.properties.PropertyMapper; +import com.azure.spring.cloud.service.implementation.jaas.Jaas; +import com.azure.spring.cloud.service.implementation.jaas.JaasResolver; +import com.azure.spring.cloud.service.implementation.kafka.AzureKafkaPropertiesUtils; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.slf4j.Logger; + +import java.util.HashMap; +import java.util.Map; + +import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.AZURE_CONFIGURED_JAAS_OPTIONS_KEY; +import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.AZURE_CONFIGURED_JAAS_OPTIONS_VALUE; +import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH; +import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.SASL_MECHANISM_OAUTH; +import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.SECURITY_PROTOCOL_CONFIG_SASL; +import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG; +import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS; +import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; + +/** + * Configures OAuth2 (OAUTHBEARER) authentication for Kafka using Azure Identity credentials. + * This configurer handles Azure Event Hubs for Kafka scenarios with Microsoft Entra ID authentication. + */ +class OAuth2AuthenticationConfigurer extends AbstractKafkaAuthenticationConfigurer { + + private static final PropertyMapper PROPERTY_MAPPER = new PropertyMapper(); + private static final Map KAFKA_OAUTH_CONFIGS = Map.of( + SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL, + SASL_MECHANISM, SASL_MECHANISM_OAUTH, + SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH + ); + + private final AzureProperties azureProperties; + + OAuth2AuthenticationConfigurer(AzureProperties azureProperties, Logger logger) { + super(logger); + this.azureProperties = azureProperties; + } + + @Override + public void configure(Map mergedProperties, Map rawProperties) { + JaasResolver resolver = new JaasResolver(); + Jaas jaas = resolver.resolve((String) mergedProperties.get(SASL_JAAS_CONFIG)) + .orElse(new Jaas(OAuthBearerLoginModule.class.getName())); + + setAzurePropertiesToJaasOptionsIfAbsent(azureProperties, jaas); + setKafkaPropertiesToJaasOptions(mergedProperties, jaas); + jaas.getOptions().put(AZURE_CONFIGURED_JAAS_OPTIONS_KEY, AZURE_CONFIGURED_JAAS_OPTIONS_VALUE); + + rawProperties.putAll(KAFKA_OAUTH_CONFIGS); + rawProperties.put(SASL_JAAS_CONFIG, jaas.toString()); + + logConfiguration(); + } + + @Override + protected boolean meetAuthenticationConditions(Map sourceProperties) { + String securityProtocol = getSecurityProtocol(sourceProperties); + String saslMechanism = getSaslMechanism(sourceProperties); + String jaasConfig = getJaasConfig(sourceProperties); + + if (meetSaslProtocolConditions(securityProtocol) + && meetSaslOAuth2MechanismConditions(saslMechanism) + && meetJaasConditions(jaasConfig)) { + return true; + } + + logger.info("Currently {} authentication mechanism is used, recommend to use Spring Cloud Azure " + + "auto-configuration for Kafka OAUTHBEARER authentication which supports various Azure Identity " + + "credentials. To leverage the auto-configuration for OAuth2, you can just remove all your security, " + + "sasl and credential configurations of Kafka and Event Hubs. And configure Kafka bootstrap servers " + + "instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093.", + saslMechanism); + return false; + } + + private boolean meetSaslOAuth2MechanismConditions(String saslMechanism) { + return saslMechanism == null || SASL_MECHANISM_OAUTH.equalsIgnoreCase(saslMechanism); + } + + private boolean meetJaasConditions(String jaasConfig) { + if (jaasConfig == null) { + return true; + } + JaasResolver resolver = new JaasResolver(); + return resolver.resolve(jaasConfig) + .map(jaas -> AZURE_CONFIGURED_JAAS_OPTIONS_VALUE.equals( + jaas.getOptions().get(AZURE_CONFIGURED_JAAS_OPTIONS_KEY))) + .orElse(false); + } + + private void setKafkaPropertiesToJaasOptions(Map properties, Jaas jaas) { + AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping.getPropertyKeys() + .forEach(k -> PROPERTY_MAPPER.from(properties.get(k)).to(p -> jaas.getOptions().put(k, (String) p))); + } + + private void setAzurePropertiesToJaasOptionsIfAbsent(AzureProperties azureProperties, Jaas jaas) { + convertAzurePropertiesToMap(azureProperties) + .forEach((k, v) -> jaas.getOptions().putIfAbsent(k, v)); + } + + private Map convertAzurePropertiesToMap(AzureProperties properties) { + Map configs = new HashMap<>(); + for (AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping m + : AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping.values()) { + PROPERTY_MAPPER.from(m.getter().apply(properties)).to(p -> configs.put(m.propertyKey(), p)); + } + return configs; + } + + private void logConfiguration() { + logger.info("Spring Cloud Azure auto-configuration for Kafka OAUTHBEARER authentication will be loaded to " + + "configure your Kafka security and sasl properties to support Azure Identity credentials."); + logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.", + SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL); + logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.", + SASL_MECHANISM, SASL_MECHANISM_OAUTH); + logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.", + SASL_JAAS_CONFIG, "***the value involves credentials and will not be logged***"); + logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.", + SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH); + } +} diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/README.md b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/README.md new file mode 100644 index 000000000000..5df26e2195e9 --- /dev/null +++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/README.md @@ -0,0 +1,196 @@ +# Azure Event Hubs for Kafka - Spring Cloud Azure Auto-Configuration + +This package provides auto-configuration for Azure Event Hubs for Kafka, supporting multiple authentication methods. + +## Architecture + +The Kafka authentication support follows a **Strategy Pattern** combined with **Template Method Pattern** to handle different authentication mechanisms: + +### Class Hierarchy + +``` +KafkaAuthenticationConfigurer (interface) + ↑ +AbstractKafkaAuthenticationConfigurer (template base class) + ↑ ↑ +OAuth2AuthenticationConfigurer ConnectionStringAuthenticationConfigurer +``` + +### Components + +1. **`KafkaAuthenticationConfigurer`** - Strategy interface for authentication configuration +2. **`AbstractKafkaAuthenticationConfigurer`** - Template base class with common validation logic +3. **`OAuth2AuthenticationConfigurer`** - Implements OAuth2/OAUTHBEARER authentication using Azure Identity +4. **`ConnectionStringAuthenticationConfigurer`** - Implements connection string authentication (deprecated) +5. **`AbstractKafkaPropertiesBeanPostProcessor`** - Base class for processing Kafka properties +6. **`KafkaPropertiesBeanPostProcessor`** - Processes standard Spring Kafka properties +7. **`KafkaBinderConfigurationPropertiesBeanPostProcessor`** - Processes Spring Cloud Stream Kafka binder properties + +### Template Method Pattern + +The `AbstractKafkaAuthenticationConfigurer` provides common functionality: +- **Bootstrap server validation** - Checks if server points to Event Hubs (*.servicebus.windows.net:9093) +- **SASL protocol checking** - Validates security protocol configuration +- **Property extraction** - Helper methods to get security properties + +Subclasses implement specific authentication logic: +- **`meetAuthenticationConditions()`** - Check if this auth type can be applied +- **`configure()`** - Apply the authentication configuration + +## Supported Authentication Methods + +### 1. OAuth2/OAUTHBEARER (Recommended) + +**Package**: `com.azure.spring.cloud.autoconfigure.implementation.kafka` + +This is the recommended authentication method that uses Azure Identity credentials (Managed Identity, Service Principal, etc.) to authenticate with Azure Event Hubs. + +**Auto-Configuration Class**: `AzureEventHubsKafkaOAuth2AutoConfiguration` + +**How it works**: +- Automatically configures SASL_SSL security protocol +- Sets OAUTHBEARER as the SASL mechanism +- Configures `KafkaOAuth2AuthenticateCallbackHandler` for token acquisition +- Supports all Azure Identity credential types + +**Configuration Example**: +```properties +spring.kafka.bootstrap-servers=.servicebus.windows.net:9093 +spring.cloud.azure.credential.managed-identity-enabled=true +``` + +### 2. Connection String (Deprecated) + +**Package**: `com.azure.spring.cloud.autoconfigure.implementation.eventhubs.kafka` + +This method uses Event Hubs connection strings with SASL_PLAIN mechanism. It is deprecated in favor of OAuth2. + +**Auto-Configuration Class**: `AzureEventHubsKafkaAutoConfiguration` (deprecated since 4.3.0) + +**How it works**: +- Extracts connection string from properties +- Configures SASL_SSL with PLAIN mechanism +- Sets up username/password authentication +- Uses `ConnectionStringAuthenticationConfigurer` strategy + +**Configuration Example** (deprecated): +```properties +spring.cloud.azure.eventhubs.connection-string= +``` + +## Configuration Hierarchy + +The auto-configuration applies in the following order: + +1. **OAuth2 Configuration** (`AzureEventHubsKafkaOAuth2AutoConfiguration`) + - Enabled by default (`spring.cloud.azure.eventhubs.kafka.enabled=true`) + - Applies to standard Kafka properties via `KafkaPropertiesBeanPostProcessor` + +2. **Spring Cloud Stream Binder Support** (`AzureEventHubsKafkaBinderOAuth2AutoConfiguration`) + - Enabled when Spring Cloud Stream Kafka binder is on classpath + - Applies OAuth2 configuration to binder properties + +3. **Connection String Configuration** (`AzureEventHubsKafkaAutoConfiguration`) + - Deprecated - use OAuth2 instead + - Only applies when connection string is explicitly configured + +## Migration Guide + +### Migrating from Connection String to OAuth2 + +**Before** (Connection String): +```properties +spring.cloud.azure.eventhubs.connection-string=Endpoint=sb://... +``` + +**After** (OAuth2 with Managed Identity): +```properties +spring.kafka.bootstrap-servers=.servicebus.windows.net:9093 +spring.cloud.azure.credential.managed-identity-enabled=true +``` + +**After** (OAuth2 with Service Principal): +```properties +spring.kafka.bootstrap-servers=.servicebus.windows.net:9093 +spring.cloud.azure.credential.client-id= +spring.cloud.azure.credential.client-secret= +spring.cloud.azure.profile.tenant-id= +``` + +## Extension Points + +To add a new authentication method, extend the `AbstractKafkaAuthenticationConfigurer`: + +1. **Extend `AbstractKafkaAuthenticationConfigurer`** - Inherit common validation logic +2. **Implement `meetAuthenticationConditions()`** - Check if this auth type applies +3. **Implement `configure()`** - Apply authentication configuration +4. **Register in auto-configuration** - Wire up in appropriate auto-configuration class + +### Example: Custom Authentication + +```java +public class CustomAuthenticationConfigurer extends AbstractKafkaAuthenticationConfigurer { + + public CustomAuthenticationConfigurer(Logger logger) { + super(logger); + } + + @Override + protected boolean meetAuthenticationConditions(Map sourceProperties) { + // Check if this authentication method should be used + // You can use inherited methods: getSecurityProtocol(), getSaslMechanism(), etc. + String mechanism = getSaslMechanism(sourceProperties); + return "CUSTOM".equals(mechanism); + } + + @Override + public void configure(Map mergedProperties, Map rawProperties) { + // Configure the authentication properties + rawProperties.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); + rawProperties.put(SASL_MECHANISM, "CUSTOM"); + rawProperties.put(SASL_JAAS_CONFIG, buildCustomJaasConfig()); + } +} +``` + +### Inherited Template Methods + +When extending `AbstractKafkaAuthenticationConfigurer`, you get: + +**Validation Methods:** +- `meetBootstrapServerConditions()` - Validates Event Hubs bootstrap server +- `meetSaslProtocolConditions()` - Checks SASL_SSL protocol +- `extractBootstrapServerList()` - Parses bootstrap server configuration + +**Property Getters:** +- `getSecurityProtocol()` - Gets security protocol from properties +- `getSaslMechanism()` - Gets SASL mechanism from properties +- `getJaasConfig()` - Gets JAAS config from properties + +## Implementation Notes + +### BeanPostProcessor Flow + +1. `postProcessBeforeInitialization()` is called for each bean +2. Checks if bean needs processing (`needsPostProcess()`) +3. For each client type (producer, consumer, admin): + - Gets merged properties (all config sources combined) + - Gets raw properties (Map to modify) + - Creates appropriate `KafkaAuthenticationConfigurer` + - Calls `canConfigure()` to check if authentication should be applied + - If yes, calls `configure()` to set authentication properties + - Clears Azure-specific properties from raw map + +### User-Agent Configuration + +The implementation automatically configures a Spring Cloud Azure user-agent for Kafka clients to help with diagnostics and tracking. + +## Testing + +All authentication configurers should be tested with: +- Various bootstrap server configurations +- Different security protocol settings +- Multiple SASL mechanism combinations +- Edge cases (null values, invalid formats, etc.) + +See `AbstractKafkaPropertiesBeanPostProcessorTest` for test patterns.