Skip to content

Commit d5e73aa

Browse files
NIFI-14568 Fixed Property Verification in Kafka Connection Service
- Corrected properties construction for Kafka Admin Client in verify method
1 parent dd8b87f commit d5e73aa

File tree

1 file changed

+6
-6
lines changed

1 file changed

+6
-6
lines changed

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -250,15 +250,14 @@ public class Kafka3ConnectionService extends AbstractControllerService implement
250250
private static final String CONNECTION_STEP = "Kafka Broker Connection";
251251
private static final String TOPIC_LISTING_STEP = "Kafka Topic Listing";
252252

253-
private volatile Properties clientProperties;
254253
private volatile ServiceConfiguration serviceConfiguration;
255254
private volatile Properties producerProperties;
256255
private volatile Properties consumerProperties;
257256
private volatile String uri;
258257

259258
@OnEnabled
260259
public void onEnabled(final ConfigurationContext configurationContext) {
261-
clientProperties = getClientProperties(configurationContext);
260+
final Properties clientProperties = getClientProperties(configurationContext);
262261
serviceConfiguration = getServiceConfiguration(configurationContext);
263262
producerProperties = getProducerProperties(configurationContext, clientProperties);
264263
consumerProperties = getConsumerProperties(configurationContext, clientProperties);
@@ -312,8 +311,7 @@ public KafkaConsumerService getConsumerService(final PollingContext pollingConte
312311
final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
313312
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties, deserializer, deserializer);
314313

315-
final Kafka3ConsumerService consumerService = new Kafka3ConsumerService(getLogger(), consumer, subscription);
316-
return consumerService;
314+
return new Kafka3ConsumerService(getLogger(), consumer, subscription);
317315
}
318316

319317
private Subscription createSubscription(final PollingContext pollingContext) {
@@ -360,9 +358,11 @@ public String getBrokerUri() {
360358
public List<ConfigVerificationResult> verify(final ConfigurationContext configurationContext, final ComponentLog verificationLogger, final Map<String, String> variables) {
361359
final List<ConfigVerificationResult> results = new ArrayList<>();
362360

361+
// Build Admin Client Properties based on configured values and defaults from Consumer Properties
363362
final Properties clientProperties = getClientProperties(configurationContext);
364-
clientProperties.putAll(variables);
365-
try (final Admin admin = Admin.create(clientProperties)) {
363+
final Properties consumerProperties = getConsumerProperties(configurationContext, clientProperties);
364+
consumerProperties.putAll(variables);
365+
try (final Admin admin = Admin.create(consumerProperties)) {
366366
final ListTopicsResult listTopicsResult = admin.listTopics();
367367

368368
final KafkaFuture<Collection<TopicListing>> requestedListings = listTopicsResult.listings();

0 commit comments

Comments
 (0)