mergedProperties) {
if (needConfigureSaslOAuth(mergedProperties)) {
JaasResolver resolver = new JaasResolver();
diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java
index 316017a9d715..9565cb928541 100644
--- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java
+++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaBinderOAuth2AutoConfiguration.java
@@ -11,18 +11,52 @@
import org.springframework.context.annotation.Configuration;
/**
- * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka Azure Identity support on Spring Cloud Stream framework.
+ * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka with Spring Cloud Stream Binder.
*
- * To trigger the {@link BindingServicePropertiesBeanPostProcessor} when kafka binder is being used, it enables {@link AzureEventHubsKafkaOAuth2AutoConfiguration}
- * for Spring Cloud Stream Kafka Binder context which is to support Azure Identity-based OAuth2 authentication.
+ * This auto-configuration extends OAuth2 support to Spring Cloud Stream Kafka Binder, enabling passwordless
+ * authentication for stream-based applications using Azure Event Hubs.
+ *
+ * Purpose
+ * When Spring Cloud Stream Kafka Binder is detected on the classpath, this configuration ensures that
+ * OAuth2 authentication is properly configured for all Kafka binder instances. It works by:
+ *
+ * - Detecting Spring Cloud Stream Kafka Binder on the classpath
+ * - Registering {@link BindingServicePropertiesBeanPostProcessor} to process binder configurations
+ * - Injecting OAuth2 configuration classes into the binder's application context
+ *
+ *
+ * Configuration Requirements
+ * This auto-configuration activates when:
+ *
+ * - {@code spring-cloud-stream-binder-kafka} is on the classpath
+ * - {@code spring.cloud.azure.eventhubs.kafka.enabled} is true (default)
+ *
+ *
+ * Example Configuration
+ * {@code
+ * spring.kafka.bootstrap-servers=mynamespace.servicebus.windows.net:9093
+ * spring.cloud.azure.credential.managed-identity-enabled=true
+ * spring.cloud.stream.bindings.input.destination=my-event-hub
+ * }
*
* @since 4.4.0
+ * @see AzureEventHubsKafkaOAuth2AutoConfiguration
+ * @see BindingServicePropertiesBeanPostProcessor
+ * @see AzureKafkaSpringCloudStreamConfiguration
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaBinderConfiguration.class)
@ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true)
public class AzureEventHubsKafkaBinderOAuth2AutoConfiguration {
+ /**
+ * Creates a BeanPostProcessor that configures OAuth2 authentication for Spring Cloud Stream Kafka binders.
+ *
+ * This processor modifies {@link BindingServiceProperties} to inject OAuth2 configuration into
+ * Kafka binder contexts, ensuring passwordless authentication works seamlessly with Spring Cloud Stream.
+ *
+ * @return the BeanPostProcessor for binder configuration
+ */
@Bean
static BeanPostProcessor bindingServicePropertiesBeanPostProcessor() {
return new BindingServicePropertiesBeanPostProcessor();
diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java
index cd86a70cc667..cbf1bb387f02 100644
--- a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java
+++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/AzureEventHubsKafkaOAuth2AutoConfiguration.java
@@ -13,16 +13,48 @@
import static com.azure.spring.cloud.autoconfigure.implementation.context.AzureContextUtils.PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME;
/**
- * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support. Provide Azure Identity-based
- * OAuth2 authentication for Event Hubs for Kafka on the basis of Spring Boot Autoconfiguration.
+ * {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support with OAuth2 authentication.
+ *
+ * This auto-configuration provides Azure Identity-based OAuth2 (OAUTHBEARER) authentication for Azure Event Hubs
+ * for Kafka. It automatically configures Kafka properties to use Azure Active Directory credentials instead of
+ * connection strings.
+ *
+ * Features
+ *
+ * - Automatic OAuth2 configuration for Kafka clients
+ * - Support for all Azure Identity credential types (Managed Identity, Service Principal, etc.)
+ * - Works with Spring Boot's standard Kafka configuration
+ * - No need to manually configure SASL/OAUTHBEARER settings
+ *
+ *
+ * Configuration Requirements
+ * To use this auto-configuration, ensure:
+ *
+ * - Kafka client libraries are on the classpath
+ * - Bootstrap servers point to Event Hubs namespace (*.servicebus.windows.net:9093)
+ * - Azure Identity credentials are properly configured
+ *
+ *
+ * Example Configuration
+ * {@code
+ * spring.kafka.bootstrap-servers=mynamespace.servicebus.windows.net:9093
+ * spring.cloud.azure.credential.managed-identity-enabled=true
+ * }
*
* @since 4.3.0
+ * @see KafkaPropertiesBeanPostProcessor
+ * @see OAuth2AuthenticationConfigurer
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(KafkaTemplate.class)
@ConditionalOnProperty(value = "spring.cloud.azure.eventhubs.kafka.enabled", havingValue = "true", matchIfMissing = true)
public class AzureEventHubsKafkaOAuth2AutoConfiguration {
+ /**
+ * Creates a BeanPostProcessor that configures OAuth2 authentication for KafkaProperties beans.
+ *
+ * @return the BeanPostProcessor for Kafka properties configuration
+ */
@Bean(PASSWORDLESS_KAFKA_PROPERTIES_BEAN_POST_PROCESSOR_BEAN_NAME)
static BeanPostProcessor kafkaPropertiesBeanPostProcessor() {
return new KafkaPropertiesBeanPostProcessor();
diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/ConnectionStringAuthenticationConfigurer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/ConnectionStringAuthenticationConfigurer.java
new file mode 100644
index 000000000000..ac7019afe40e
--- /dev/null
+++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/ConnectionStringAuthenticationConfigurer.java
@@ -0,0 +1,92 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.cloud.autoconfigure.implementation.kafka;
+
+import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
+import com.azure.spring.cloud.core.service.AzureServiceType;
+import org.slf4j.Logger;
+
+import java.util.Map;
+
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL;
+
+/**
+ * Configures connection string-based (SASL_PLAIN) authentication for Kafka using Event Hubs connection strings.
+ *
+ * This configurer handles the deprecated connection string authentication method. It extracts the connection
+ * string from the {@link ServiceConnectionStringProvider} and configures SASL_PLAIN authentication.
+ *
+ *
+ * @deprecated This authentication method is deprecated in favor of OAuth2 authentication.
+ * Use {@link OAuth2AuthenticationConfigurer} instead.
+ */
+@Deprecated
+public class ConnectionStringAuthenticationConfigurer extends AbstractKafkaAuthenticationConfigurer {
+
+ private static final String SASL_MECHANISM_PLAIN = "PLAIN";
+ private static final String SASL_JAAS_CONFIG_TEMPLATE =
+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";%s";
+
+ private final ServiceConnectionStringProvider connectionStringProvider;
+
+ public ConnectionStringAuthenticationConfigurer(
+ ServiceConnectionStringProvider connectionStringProvider,
+ Logger logger) {
+ super(logger);
+ this.connectionStringProvider = connectionStringProvider;
+ }
+
+ @Override
+ protected boolean meetAuthenticationConditions(Map sourceProperties) {
+ // Connection string authentication requires a connection string provider
+ if (connectionStringProvider == null) {
+ return false;
+ }
+
+ String securityProtocol = getSecurityProtocol(sourceProperties);
+ String saslMechanism = getSaslMechanism(sourceProperties);
+
+ // Connection string auth works with SASL_SSL protocol and PLAIN mechanism
+ // or when these are not configured (we'll set them)
+ boolean protocolMatch = meetSaslProtocolConditions(securityProtocol);
+ boolean mechanismMatch = saslMechanism == null || SASL_MECHANISM_PLAIN.equalsIgnoreCase(saslMechanism);
+
+ if (protocolMatch && mechanismMatch) {
+ return true;
+ }
+
+ logger.debug("Connection string authentication cannot be applied. Security protocol: {}, SASL mechanism: {}",
+ securityProtocol, saslMechanism);
+ return false;
+ }
+
+ @Override
+ public void configure(Map mergedProperties, Map rawProperties) {
+ String connectionString = connectionStringProvider.getConnectionString();
+
+ // Configure SASL_PLAIN authentication with connection string
+ rawProperties.put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name());
+ rawProperties.put(SASL_MECHANISM, SASL_MECHANISM_PLAIN);
+ rawProperties.put(SASL_JAAS_CONFIG,
+ String.format(SASL_JAAS_CONFIG_TEMPLATE, connectionString, System.getProperty("line.separator")));
+
+ logConfiguration();
+ }
+
+ private void logConfiguration() {
+ logger.warn("Autoconfiguration for Event Hubs for Kafka on connection string/Azure Resource Manager"
+ + " has been deprecated, please migrate to OAuth2 authentication with Azure Identity credentials."
+ + " To leverage the OAuth2 authentication, you can delete all your Event Hubs for Kafka credential "
+ + "configurations, and configure Kafka bootstrap servers instead, which can be set as "
+ + "spring.kafka.bootstrap-servers=EventHubsNamespacesFQDN:9093.");
+ logger.debug("Connection string authentication property {} will be configured as {}.",
+ SECURITY_PROTOCOL_CONFIG, SASL_SSL.name());
+ logger.debug("Connection string authentication property {} will be configured as {}.",
+ SASL_MECHANISM, SASL_MECHANISM_PLAIN);
+ logger.debug("Connection string authentication property {} will be configured (value not logged for security).",
+ SASL_JAAS_CONFIG);
+ }
+}
diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaAuthenticationConfigurer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaAuthenticationConfigurer.java
new file mode 100644
index 000000000000..93d1a1c7b2a5
--- /dev/null
+++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/KafkaAuthenticationConfigurer.java
@@ -0,0 +1,28 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.cloud.autoconfigure.implementation.kafka;
+
+import java.util.Map;
+
+/**
+ * Strategy interface for configuring Kafka authentication properties.
+ * Implementations handle different authentication mechanisms (OAuth2, connection string, etc.).
+ */
+interface KafkaAuthenticationConfigurer {
+
+ /**
+ * Determines if this configurer can handle the given Kafka properties.
+ *
+ * @param mergedProperties the merged Kafka properties
+ * @return true if this configurer can configure authentication for these properties
+ */
+ boolean canConfigure(Map mergedProperties);
+
+ /**
+ * Configure authentication properties on the raw Kafka properties map.
+ *
+ * @param mergedProperties the merged Kafka properties (read-only, used for decision making)
+ * @param rawProperties the raw Kafka properties map to modify with authentication config
+ */
+ void configure(Map mergedProperties, Map rawProperties);
+}
diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/OAuth2AuthenticationConfigurer.java b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/OAuth2AuthenticationConfigurer.java
new file mode 100644
index 000000000000..6b3ac2f6c8f6
--- /dev/null
+++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/OAuth2AuthenticationConfigurer.java
@@ -0,0 +1,129 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+package com.azure.spring.cloud.autoconfigure.implementation.kafka;
+
+import com.azure.spring.cloud.core.properties.AzureProperties;
+import com.azure.spring.cloud.core.implementation.properties.PropertyMapper;
+import com.azure.spring.cloud.service.implementation.jaas.Jaas;
+import com.azure.spring.cloud.service.implementation.jaas.JaasResolver;
+import com.azure.spring.cloud.service.implementation.kafka.AzureKafkaPropertiesUtils;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.AZURE_CONFIGURED_JAAS_OPTIONS_KEY;
+import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.AZURE_CONFIGURED_JAAS_OPTIONS_VALUE;
+import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH;
+import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.SASL_MECHANISM_OAUTH;
+import static com.azure.spring.cloud.autoconfigure.implementation.kafka.AbstractKafkaPropertiesBeanPostProcessor.SECURITY_PROTOCOL_CONFIG_SASL;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
+
+/**
+ * Configures OAuth2 (OAUTHBEARER) authentication for Kafka using Azure Identity credentials.
+ * This configurer handles Azure Event Hubs for Kafka scenarios with Microsoft Entra ID authentication.
+ */
+class OAuth2AuthenticationConfigurer extends AbstractKafkaAuthenticationConfigurer {
+
+ private static final PropertyMapper PROPERTY_MAPPER = new PropertyMapper();
+ private static final Map KAFKA_OAUTH_CONFIGS = Map.of(
+ SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL,
+ SASL_MECHANISM, SASL_MECHANISM_OAUTH,
+ SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH
+ );
+
+ private final AzureProperties azureProperties;
+
+ OAuth2AuthenticationConfigurer(AzureProperties azureProperties, Logger logger) {
+ super(logger);
+ this.azureProperties = azureProperties;
+ }
+
+ @Override
+ public void configure(Map mergedProperties, Map rawProperties) {
+ JaasResolver resolver = new JaasResolver();
+ Jaas jaas = resolver.resolve((String) mergedProperties.get(SASL_JAAS_CONFIG))
+ .orElse(new Jaas(OAuthBearerLoginModule.class.getName()));
+
+ setAzurePropertiesToJaasOptionsIfAbsent(azureProperties, jaas);
+ setKafkaPropertiesToJaasOptions(mergedProperties, jaas);
+ jaas.getOptions().put(AZURE_CONFIGURED_JAAS_OPTIONS_KEY, AZURE_CONFIGURED_JAAS_OPTIONS_VALUE);
+
+ rawProperties.putAll(KAFKA_OAUTH_CONFIGS);
+ rawProperties.put(SASL_JAAS_CONFIG, jaas.toString());
+
+ logConfiguration();
+ }
+
+ @Override
+ protected boolean meetAuthenticationConditions(Map sourceProperties) {
+ String securityProtocol = getSecurityProtocol(sourceProperties);
+ String saslMechanism = getSaslMechanism(sourceProperties);
+ String jaasConfig = getJaasConfig(sourceProperties);
+
+ if (meetSaslProtocolConditions(securityProtocol)
+ && meetSaslOAuth2MechanismConditions(saslMechanism)
+ && meetJaasConditions(jaasConfig)) {
+ return true;
+ }
+
+ logger.info("Currently {} authentication mechanism is used, recommend to use Spring Cloud Azure "
+ + "auto-configuration for Kafka OAUTHBEARER authentication which supports various Azure Identity "
+ + "credentials. To leverage the auto-configuration for OAuth2, you can just remove all your security, "
+ + "sasl and credential configurations of Kafka and Event Hubs. And configure Kafka bootstrap servers "
+ + "instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093.",
+ saslMechanism);
+ return false;
+ }
+
+ private boolean meetSaslOAuth2MechanismConditions(String saslMechanism) {
+ return saslMechanism == null || SASL_MECHANISM_OAUTH.equalsIgnoreCase(saslMechanism);
+ }
+
+ private boolean meetJaasConditions(String jaasConfig) {
+ if (jaasConfig == null) {
+ return true;
+ }
+ JaasResolver resolver = new JaasResolver();
+ return resolver.resolve(jaasConfig)
+ .map(jaas -> AZURE_CONFIGURED_JAAS_OPTIONS_VALUE.equals(
+ jaas.getOptions().get(AZURE_CONFIGURED_JAAS_OPTIONS_KEY)))
+ .orElse(false);
+ }
+
+ private void setKafkaPropertiesToJaasOptions(Map properties, Jaas jaas) {
+ AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping.getPropertyKeys()
+ .forEach(k -> PROPERTY_MAPPER.from(properties.get(k)).to(p -> jaas.getOptions().put(k, (String) p)));
+ }
+
+ private void setAzurePropertiesToJaasOptionsIfAbsent(AzureProperties azureProperties, Jaas jaas) {
+ convertAzurePropertiesToMap(azureProperties)
+ .forEach((k, v) -> jaas.getOptions().putIfAbsent(k, v));
+ }
+
+ private Map convertAzurePropertiesToMap(AzureProperties properties) {
+ Map configs = new HashMap<>();
+ for (AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping m
+ : AzureKafkaPropertiesUtils.AzureKafkaPasswordlessPropertiesMapping.values()) {
+ PROPERTY_MAPPER.from(m.getter().apply(properties)).to(p -> configs.put(m.propertyKey(), p));
+ }
+ return configs;
+ }
+
+ private void logConfiguration() {
+ logger.info("Spring Cloud Azure auto-configuration for Kafka OAUTHBEARER authentication will be loaded to "
+ + "configure your Kafka security and sasl properties to support Azure Identity credentials.");
+ logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.",
+ SECURITY_PROTOCOL_CONFIG, SECURITY_PROTOCOL_CONFIG_SASL);
+ logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.",
+ SASL_MECHANISM, SASL_MECHANISM_OAUTH);
+ logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.",
+ SASL_JAAS_CONFIG, "***the value involves credentials and will not be logged***");
+ logger.debug("OAUTHBEARER authentication property {} will be configured as {} to support Azure Identity credentials.",
+ SASL_LOGIN_CALLBACK_HANDLER_CLASS, SASL_LOGIN_CALLBACK_HANDLER_CLASS_OAUTH);
+ }
+}
diff --git a/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/README.md b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/README.md
new file mode 100644
index 000000000000..5df26e2195e9
--- /dev/null
+++ b/sdk/spring/spring-cloud-azure-autoconfigure/src/main/java/com/azure/spring/cloud/autoconfigure/implementation/kafka/README.md
@@ -0,0 +1,196 @@
+# Azure Event Hubs for Kafka - Spring Cloud Azure Auto-Configuration
+
+This package provides auto-configuration for Azure Event Hubs for Kafka, supporting multiple authentication methods.
+
+## Architecture
+
+The Kafka authentication support follows a **Strategy Pattern** combined with **Template Method Pattern** to handle different authentication mechanisms:
+
+### Class Hierarchy
+
+```
+KafkaAuthenticationConfigurer (interface)
+ ↑
+AbstractKafkaAuthenticationConfigurer (template base class)
+ ↑ ↑
+OAuth2AuthenticationConfigurer ConnectionStringAuthenticationConfigurer
+```
+
+### Components
+
+1. **`KafkaAuthenticationConfigurer`** - Strategy interface for authentication configuration
+2. **`AbstractKafkaAuthenticationConfigurer`** - Template base class with common validation logic
+3. **`OAuth2AuthenticationConfigurer`** - Implements OAuth2/OAUTHBEARER authentication using Azure Identity
+4. **`ConnectionStringAuthenticationConfigurer`** - Implements connection string authentication (deprecated)
+5. **`AbstractKafkaPropertiesBeanPostProcessor`** - Base class for processing Kafka properties
+6. **`KafkaPropertiesBeanPostProcessor`** - Processes standard Spring Kafka properties
+7. **`KafkaBinderConfigurationPropertiesBeanPostProcessor`** - Processes Spring Cloud Stream Kafka binder properties
+
+### Template Method Pattern
+
+The `AbstractKafkaAuthenticationConfigurer` provides common functionality:
+- **Bootstrap server validation** - Checks if server points to Event Hubs (*.servicebus.windows.net:9093)
+- **SASL protocol checking** - Validates security protocol configuration
+- **Property extraction** - Helper methods to get security properties
+
+Subclasses implement specific authentication logic:
+- **`meetAuthenticationConditions()`** - Check if this auth type can be applied
+- **`configure()`** - Apply the authentication configuration
+
+## Supported Authentication Methods
+
+### 1. OAuth2/OAUTHBEARER (Recommended)
+
+**Package**: `com.azure.spring.cloud.autoconfigure.implementation.kafka`
+
+This is the recommended authentication method that uses Azure Identity credentials (Managed Identity, Service Principal, etc.) to authenticate with Azure Event Hubs.
+
+**Auto-Configuration Class**: `AzureEventHubsKafkaOAuth2AutoConfiguration`
+
+**How it works**:
+- Automatically configures SASL_SSL security protocol
+- Sets OAUTHBEARER as the SASL mechanism
+- Configures `KafkaOAuth2AuthenticateCallbackHandler` for token acquisition
+- Supports all Azure Identity credential types
+
+**Configuration Example**:
+```properties
+spring.kafka.bootstrap-servers=.servicebus.windows.net:9093
+spring.cloud.azure.credential.managed-identity-enabled=true
+```
+
+### 2. Connection String (Deprecated)
+
+**Package**: `com.azure.spring.cloud.autoconfigure.implementation.eventhubs.kafka`
+
+This method uses Event Hubs connection strings with SASL_PLAIN mechanism. It is deprecated in favor of OAuth2.
+
+**Auto-Configuration Class**: `AzureEventHubsKafkaAutoConfiguration` (deprecated since 4.3.0)
+
+**How it works**:
+- Extracts connection string from properties
+- Configures SASL_SSL with PLAIN mechanism
+- Sets up username/password authentication
+- Uses `ConnectionStringAuthenticationConfigurer` strategy
+
+**Configuration Example** (deprecated):
+```properties
+spring.cloud.azure.eventhubs.connection-string=
+```
+
+## Configuration Hierarchy
+
+The auto-configuration applies in the following order:
+
+1. **OAuth2 Configuration** (`AzureEventHubsKafkaOAuth2AutoConfiguration`)
+ - Enabled by default (`spring.cloud.azure.eventhubs.kafka.enabled=true`)
+ - Applies to standard Kafka properties via `KafkaPropertiesBeanPostProcessor`
+
+2. **Spring Cloud Stream Binder Support** (`AzureEventHubsKafkaBinderOAuth2AutoConfiguration`)
+ - Enabled when Spring Cloud Stream Kafka binder is on classpath
+ - Applies OAuth2 configuration to binder properties
+
+3. **Connection String Configuration** (`AzureEventHubsKafkaAutoConfiguration`)
+ - Deprecated - use OAuth2 instead
+ - Only applies when connection string is explicitly configured
+
+## Migration Guide
+
+### Migrating from Connection String to OAuth2
+
+**Before** (Connection String):
+```properties
+spring.cloud.azure.eventhubs.connection-string=Endpoint=sb://...
+```
+
+**After** (OAuth2 with Managed Identity):
+```properties
+spring.kafka.bootstrap-servers=.servicebus.windows.net:9093
+spring.cloud.azure.credential.managed-identity-enabled=true
+```
+
+**After** (OAuth2 with Service Principal):
+```properties
+spring.kafka.bootstrap-servers=.servicebus.windows.net:9093
+spring.cloud.azure.credential.client-id=
+spring.cloud.azure.credential.client-secret=
+spring.cloud.azure.profile.tenant-id=
+```
+
+## Extension Points
+
+To add a new authentication method, extend the `AbstractKafkaAuthenticationConfigurer`:
+
+1. **Extend `AbstractKafkaAuthenticationConfigurer`** - Inherit common validation logic
+2. **Implement `meetAuthenticationConditions()`** - Check if this auth type applies
+3. **Implement `configure()`** - Apply authentication configuration
+4. **Register in auto-configuration** - Wire up in appropriate auto-configuration class
+
+### Example: Custom Authentication
+
+```java
+public class CustomAuthenticationConfigurer extends AbstractKafkaAuthenticationConfigurer {
+
+ public CustomAuthenticationConfigurer(Logger logger) {
+ super(logger);
+ }
+
+ @Override
+ protected boolean meetAuthenticationConditions(Map sourceProperties) {
+ // Check if this authentication method should be used
+ // You can use inherited methods: getSecurityProtocol(), getSaslMechanism(), etc.
+ String mechanism = getSaslMechanism(sourceProperties);
+ return "CUSTOM".equals(mechanism);
+ }
+
+ @Override
+ public void configure(Map mergedProperties, Map rawProperties) {
+ // Configure the authentication properties
+ rawProperties.put(SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
+ rawProperties.put(SASL_MECHANISM, "CUSTOM");
+ rawProperties.put(SASL_JAAS_CONFIG, buildCustomJaasConfig());
+ }
+}
+```
+
+### Inherited Template Methods
+
+When extending `AbstractKafkaAuthenticationConfigurer`, you get:
+
+**Validation Methods:**
+- `meetBootstrapServerConditions()` - Validates Event Hubs bootstrap server
+- `meetSaslProtocolConditions()` - Checks SASL_SSL protocol
+- `extractBootstrapServerList()` - Parses bootstrap server configuration
+
+**Property Getters:**
+- `getSecurityProtocol()` - Gets security protocol from properties
+- `getSaslMechanism()` - Gets SASL mechanism from properties
+- `getJaasConfig()` - Gets JAAS config from properties
+
+## Implementation Notes
+
+### BeanPostProcessor Flow
+
+1. `postProcessBeforeInitialization()` is called for each bean
+2. Checks if bean needs processing (`needsPostProcess()`)
+3. For each client type (producer, consumer, admin):
+ - Gets merged properties (all config sources combined)
+ - Gets raw properties (Map to modify)
+ - Creates appropriate `KafkaAuthenticationConfigurer`
+ - Calls `canConfigure()` to check if authentication should be applied
+ - If yes, calls `configure()` to set authentication properties
+ - Clears Azure-specific properties from raw map
+
+### User-Agent Configuration
+
+The implementation automatically configures a Spring Cloud Azure user-agent for Kafka clients to help with diagnostics and tracking.
+
+## Testing
+
+All authentication configurers should be tested with:
+- Various bootstrap server configurations
+- Different security protocol settings
+- Multiple SASL mechanism combinations
+- Edge cases (null values, invalid formats, etc.)
+
+See `AbstractKafkaPropertiesBeanPostProcessorTest` for test patterns.