Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,35 @@


/**
* {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support.
* {@link EnableAutoConfiguration Auto-configuration} for Azure Event Hubs Kafka support with connection string authentication.
*
* <p><strong>DEPRECATED:</strong> This auto-configuration is deprecated since version 4.3.0. Please migrate to
* {@link AzureEventHubsKafkaOAuth2AutoConfiguration} which provides OAuth2-based authentication using Azure Identity.</p>
*
* <h2>Deprecation Notice</h2>
* <p>This configuration uses connection string-based authentication (SASL_PLAIN) which is being phased out in favor
* of more secure OAuth2 authentication. The OAuth2 approach provides:</p>
* <ul>
* <li>Better security through Azure Active Directory integration</li>
* <li>Support for managed identities</li>
* <li>No need to store connection strings in configuration</li>
* <li>Automatic token rotation</li>
* </ul>
*
* <h2>Migration Path</h2>
* <p>To migrate from connection string to OAuth2:</p>
* <pre>{@code
* // Old configuration (deprecated)
* spring.cloud.azure.eventhubs.connection-string=Endpoint=sb://...
*
* // New configuration (recommended)
* spring.kafka.bootstrap-servers=mynamespace.servicebus.windows.net:9093
* spring.cloud.azure.credential.managed-identity-enabled=true
* }</pre>
*
* @since 4.0.0
* @deprecated 4.3.0 in favor of {@link AzureEventHubsKafkaOAuth2AutoConfiguration}.
* @see AzureEventHubsKafkaOAuth2AutoConfiguration
*/
@Deprecated
@Configuration(proxyBeanMethods = false)
Expand All @@ -40,6 +65,13 @@ public class AzureEventHubsKafkaAutoConfiguration {

private static final Logger LOGGER = LoggerFactory.getLogger(AzureEventHubsKafkaAutoConfiguration.class);

/**
* Creates a connection string provider for Event Hubs Kafka when a connection string is configured.
*
* @param environment the Spring environment containing configuration properties
* @return a connection string provider initialized with the Event Hubs connection string
* @throws IllegalArgumentException if the connection string is invalid
*/
@Bean
@ConditionalOnExpression("'${spring.cloud.azure.eventhubs.connection-string:}' != ''")
@ConditionalOnMissingBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class)
Expand All @@ -56,6 +88,11 @@ StaticConnectionStringProvider<AzureServiceType.EventHubs> eventHubsKafkaConnect
return new StaticConnectionStringProvider<>(AzureServiceType.EVENT_HUBS, connectionString);
}

/**
* Creates a BeanPostProcessor that configures connection string-based authentication for KafkaProperties beans.
*
* @return the BeanPostProcessor for Kafka properties configuration
*/
@Bean
@ConditionalOnBean(value = AzureServiceType.EventHubs.class, parameterizedContainer = ServiceConnectionStringProvider.class)
static KafkaPropertiesBeanPostProcessor kafkaPropertiesBeanPostProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Licensed under the MIT License.
package com.azure.spring.cloud.autoconfigure.implementation.eventhubs.kafka;

import com.azure.spring.cloud.core.implementation.connectionstring.EventHubsConnectionString;
import com.azure.spring.cloud.autoconfigure.implementation.kafka.ConnectionStringAuthenticationConfigurer;
import com.azure.spring.cloud.core.provider.connectionstring.ServiceConnectionStringProvider;
import com.azure.spring.cloud.core.service.AzureServiceType;
import org.slf4j.Logger;
Expand All @@ -17,49 +17,53 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;

import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
import static org.apache.kafka.common.security.auth.SecurityProtocol.SASL_SSL;
import com.azure.spring.cloud.core.implementation.connectionstring.EventHubsConnectionString;

/**
* {@link BeanPostProcessor} for {@link KafkaProperties} to configure connection string credentials.
*
* @deprecated This class is deprecated in favor of OAuth2-based authentication.
* Use {@code AzureEventHubsKafkaOAuth2AutoConfiguration} instead.
*/
@Deprecated
class KafkaPropertiesBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPropertiesBeanPostProcessor.class);
private static final String SASL_CONFIG_VALUE = "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";%s";

private ApplicationContext applicationContext;

@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof KafkaProperties) {
//TODO(yiliu6): link to OAuth2 reference doc here
LOGGER.warn("Autoconfiguration for Event Hubs for Kafka on connection string/Azure Resource Manager"
+ " has been deprecated, please migrate to AzureEventHubsKafkaOAuth2AutoConfiguration for OAuth2 authentication with Azure Identity credentials."
+ " To leverage the OAuth2 authentication, you can delete all your Event Hubs for Kafka credential configurations, and configure Kafka bootstrap servers"
+ " instead, which can be set as spring.kafka.boostrap-servers=EventHubsNamespacesFQDN:9093.");

KafkaProperties kafkaProperties = (KafkaProperties) bean;
ResolvableType provider = ResolvableType.forClassWithGenerics(ServiceConnectionStringProvider.class, AzureServiceType.EventHubs.class);
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.EventHubs>> beanProvider = applicationContext.getBeanProvider(provider);
if (bean instanceof KafkaProperties kafkaProperties) {
ResolvableType provider = ResolvableType.forClassWithGenerics(
ServiceConnectionStringProvider.class, AzureServiceType.EventHubs.class);
ObjectProvider<ServiceConnectionStringProvider<AzureServiceType.EventHubs>> beanProvider =
applicationContext.getBeanProvider(provider);

ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider = beanProvider.getIfAvailable();
ServiceConnectionStringProvider<AzureServiceType.EventHubs> connectionStringProvider =
beanProvider.getIfAvailable();

if (connectionStringProvider == null) {
LOGGER.debug("Cannot find a bean of type ServiceConnectionStringProvider<AzureServiceType.EventHubs>, "
+ "Spring Cloud Azure will skip performing JAAS enhancements on the KafkaProperties bean.");
+ "Spring Cloud Azure will skip performing connection string configuration on the KafkaProperties bean.");
return bean;
}

// Set bootstrap servers from connection string
String connectionString = connectionStringProvider.getConnectionString();
String bootstrapServer = new EventHubsConnectionString(connectionString).getFullyQualifiedNamespace() + ":9093";
kafkaProperties.setBootstrapServers(new ArrayList<>(Collections.singletonList(bootstrapServer)));
kafkaProperties.getProperties().put(SECURITY_PROTOCOL_CONFIG, SASL_SSL.name());
kafkaProperties.getProperties().put(SASL_MECHANISM, "PLAIN");
kafkaProperties.getProperties().put(SASL_JAAS_CONFIG, String.format(SASL_CONFIG_VALUE,
connectionString, System.getProperty("line.separator")));

// Use the ConnectionStringAuthenticationConfigurer to configure authentication
ConnectionStringAuthenticationConfigurer configurer =
new ConnectionStringAuthenticationConfigurer(connectionStringProvider, LOGGER);

Map<String, Object> mergedProperties = kafkaProperties.buildProducerProperties(null);
if (configurer.canConfigure(mergedProperties)) {
configurer.configure(mergedProperties, kafkaProperties.getProperties());
}
}
return bean;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.spring.cloud.autoconfigure.implementation.kafka;

import org.slf4j.Logger;
import org.springframework.util.StringUtils;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;

/**
* Abstract base class for Kafka authentication configurers that provides common functionality
* for checking bootstrap servers and SASL configuration conditions.
* <p>
* This class implements the Template Method pattern, where subclasses provide specific
* authentication mechanism implementations while common validation logic is shared.
* </p>
*/
abstract class AbstractKafkaAuthenticationConfigurer implements KafkaAuthenticationConfigurer {

protected final Logger logger;

protected AbstractKafkaAuthenticationConfigurer(Logger logger) {
this.logger = logger;
}

@Override
public boolean canConfigure(Map<String, Object> mergedProperties) {
return meetBootstrapServerConditions(mergedProperties)
&& meetAuthenticationConditions(mergedProperties);
}

/**
* Checks if the bootstrap server configuration meets the requirements for Azure Event Hubs.
* The bootstrap server must point to an Event Hubs namespace (*.servicebus.windows.net:9093).
*
* @param sourceProperties the Kafka properties to check
* @return true if bootstrap server configuration is valid for Azure Event Hubs
*/
protected boolean meetBootstrapServerConditions(Map<String, Object> sourceProperties) {
Object bootstrapServers = sourceProperties.get(BOOTSTRAP_SERVERS_CONFIG);
List<String> serverList = extractBootstrapServerList(bootstrapServers);

if (serverList == null) {
logger.debug("Kafka bootstrap server configuration doesn't meet Azure Event Hubs requirements.");
return false;
}

return serverList.size() == 1 && serverList.get(0).endsWith(":9093");
}

/**
* Extracts bootstrap server list from the configuration value.
* Handles both String and Iterable configurations.
*
* @param bootstrapServers the bootstrap servers configuration value
* @return list of server addresses, or null if invalid format
*/
protected List<String> extractBootstrapServerList(Object bootstrapServers) {
if (bootstrapServers instanceof String) {
return Arrays.asList(StringUtils.delimitedListToStringArray((String) bootstrapServers, ","));
} else if (bootstrapServers instanceof Iterable<?>) {
List<String> serverList = new java.util.ArrayList<>();
for (Object obj : (Iterable<?>) bootstrapServers) {
if (obj instanceof String) {
serverList.add((String) obj);
} else {
return null;
}
}
return serverList;
}
return null;
}

/**
* Checks if the SASL protocol is set to SASL_SSL or not configured.
*
* @param securityProtocol the security protocol configuration value
* @return true if security protocol is compatible
*/
protected boolean meetSaslProtocolConditions(String securityProtocol) {
return securityProtocol == null
|| AbstractKafkaPropertiesBeanPostProcessor.SECURITY_PROTOCOL_CONFIG_SASL.equalsIgnoreCase(securityProtocol);
}

/**
* Template method for subclasses to implement specific authentication mechanism checks.
* This method should verify that the Kafka properties are compatible with the specific
* authentication type (OAuth2, connection string, etc.).
*
* @param sourceProperties the Kafka properties to check
* @return true if this authentication mechanism can be applied
*/
protected abstract boolean meetAuthenticationConditions(Map<String, Object> sourceProperties);

/**
* Gets the security protocol value from properties.
*
* @param sourceProperties the Kafka properties
* @return the security protocol value or null
*/
protected String getSecurityProtocol(Map<String, Object> sourceProperties) {
return (String) sourceProperties.get(SECURITY_PROTOCOL_CONFIG);
}

/**
* Gets the SASL mechanism value from properties.
*
* @param sourceProperties the Kafka properties
* @return the SASL mechanism value or null
*/
protected String getSaslMechanism(Map<String, Object> sourceProperties) {
return (String) sourceProperties.get(SASL_MECHANISM);
}

/**
* Gets the SASL JAAS config value from properties.
*
* @param sourceProperties the Kafka properties
* @return the SASL JAAS config value or null
*/
protected String getJaasConfig(Map<String, Object> sourceProperties) {
return (String) sourceProperties.get(SASL_JAAS_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,47 @@
import static org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
import static org.springframework.util.StringUtils.delimitedListToStringArray;

/**
* Abstract base class for Kafka properties bean post-processors that configure Azure authentication.
*
* <p>This class provides a framework for automatically configuring authentication properties for various
* Kafka client types (producers, consumers, and admins). It implements a strategy pattern to allow different
* authentication methods to be plugged in.</p>
*
* <h2>Architecture</h2>
* <p>The post-processor works in three phases:</p>
* <ol>
* <li><strong>Detection</strong>: Identifies beans that need Kafka authentication configuration</li>
* <li><strong>Configuration</strong>: Applies authentication settings using a {@link KafkaAuthenticationConfigurer}</li>
* <li><strong>Cleanup</strong>: Removes Azure-specific properties that shouldn't be passed to Kafka clients</li>
* </ol>
*
* <h2>Supported Client Types</h2>
* <p>This processor handles authentication for:</p>
* <ul>
* <li>Kafka Producers</li>
* <li>Kafka Consumers</li>
* <li>Kafka Admin Clients</li>
* </ul>
*
* <h2>Subclass Implementation</h2>
* <p>Subclasses must implement methods to:</p>
* <ul>
* <li>Extract merged properties (all configuration sources combined)</li>
* <li>Access raw property maps (for modification)</li>
* <li>Determine which beans need processing</li>
* </ul>
*
* <h2>Authentication Configuration</h2>
* <p>The class uses {@link KafkaAuthenticationConfigurer} instances to apply authentication settings.
* The default implementation uses {@link OAuth2AuthenticationConfigurer} for OAuth2/OAUTHBEARER authentication.</p>
*
* @param <T> the type of Kafka properties bean to process
* @see KafkaAuthenticationConfigurer
* @see OAuth2AuthenticationConfigurer
* @see KafkaPropertiesBeanPostProcessor
* @see KafkaBinderConfigurationPropertiesBeanPostProcessor
*/
abstract class AbstractKafkaPropertiesBeanPostProcessor<T> implements BeanPostProcessor, ApplicationContextAware {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractKafkaPropertiesBeanPostProcessor.class);
Expand Down Expand Up @@ -190,15 +231,25 @@ protected Map<String, Object> invokeBuildKafkaProperties(KafkaProperties kafkaPr
* @param rawPropertiesMap the raw Kafka properties Map to configure JAAS to and remove Azure Properties from
*/
private void replaceAzurePropertiesWithJaas(Map<String, Object> mergedProperties, Map<String, String> rawPropertiesMap) {
resolveJaasForAzure(mergedProperties)
.ifPresent(jaas -> {
configJaasToKafkaRawProperties(jaas, rawPropertiesMap);
logConfigureOAuthProperties();
configureKafkaUserAgent();
});
// Use strategy pattern to configure authentication
KafkaAuthenticationConfigurer configurer = createAuthenticationConfigurer();
if (configurer.canConfigure(mergedProperties)) {
configurer.configure(mergedProperties, rawPropertiesMap);
configureKafkaUserAgent();
}
clearAzureProperties(rawPropertiesMap);
}

/**
* Creates the appropriate authentication configurer based on available Azure properties.
* Currently supports OAuth2 (OAUTHBEARER) authentication with Azure Identity.
*
* @return the authentication configurer to use
*/
private KafkaAuthenticationConfigurer createAuthenticationConfigurer() {
return new OAuth2AuthenticationConfigurer(azureGlobalProperties, getLogger());
}

private Optional<Jaas> resolveJaasForAzure(Map<String, Object> mergedProperties) {
if (needConfigureSaslOAuth(mergedProperties)) {
JaasResolver resolver = new JaasResolver();
Expand Down
Loading