From 0f2ee3e1b5de850f4d4d679d190b0e79bbb14844 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9Caashikam=E2=80=9D?= <“arshika.nm”@gmail.com> Date: Mon, 3 Nov 2025 10:10:22 +0530 Subject: [PATCH] Add error when server is not available --- .../kafka/impl/KafkaRecordConsumer.java | 20 +++++++++++++------ .../kafka/nativeimpl/consumer/Poll.java | 9 ++++----- .../stdlib/kafka/utils/KafkaUtils.java | 20 +++++++++++++++++++ 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordConsumer.java b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordConsumer.java index 25762420..4ec8766c 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordConsumer.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordConsumer.java @@ -18,13 +18,12 @@ package io.ballerina.stdlib.kafka.impl; -import io.ballerina.runtime.api.values.BError; import io.ballerina.stdlib.kafka.api.KafkaListener; import io.ballerina.stdlib.kafka.utils.KafkaConstants; +import io.ballerina.stdlib.kafka.utils.KafkaUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,18 +103,20 @@ private void poll() { + this.consumerId + " has received " + recordsRetrieved.count() + " records."); } processRetrievedRecords(recordsRetrieved); - } catch (KafkaException | IllegalStateException | IllegalArgumentException e) { + } catch (Exception e) { this.kafkaListener.onError(e); // When un-recoverable exception is thrown we stop scheduling task to the executor. // Later at stopConsume() on KafkaRecordConsumer we close the consumer. this.pollTaskFuture.cancel(false); - } catch (BError e) { - this.kafkaListener.onError(e); } } private void processRetrievedRecords(ConsumerRecords consumerRecords) { - if (Objects.nonNull(consumerRecords) && !consumerRecords.isEmpty()) { + if (Objects.isNull(consumerRecords)) { + return; + } + + if (!consumerRecords.isEmpty()) { Semaphore sem = new Semaphore(0); KafkaPollCycleFutureListener pollCycleListener = new KafkaPollCycleFutureListener(sem, serviceId); this.kafkaListener.onRecordsReceived(consumerRecords, kafkaConsumer, groupId, pollCycleListener); @@ -128,6 +129,13 @@ private void processRetrievedRecords(ConsumerRecords consumerRecords) { this.kafkaListener.onError(e); this.pollTaskFuture.cancel(false); } + } else { + // Check if there are active connections when we receive empty records + double connectionCount = KafkaUtils.getActiveConnectionCount(kafkaConsumer); + if (connectionCount == 0) { + Exception serverDownException = new Exception("Server might not be available"); + this.kafkaListener.onError(serverDownException); + } } } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java index f3c125c3..ae181348 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java @@ -35,7 +35,6 @@ import io.ballerina.stdlib.kafka.utils.ModuleUtils; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.KafkaException; import java.time.Duration; import java.util.concurrent.CompletableFuture; @@ -82,12 +81,12 @@ public static Object poll(Environment env, BObject consumerObject, BDecimal time kafkaConsumer, autoSeek); } balFuture.complete(consumerRecords); - } catch (IllegalStateException | IllegalArgumentException | KafkaException e) { - KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL); - balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage())); } catch (BError e) { KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL); balFuture.complete(e); + } catch (Exception e) { + KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL); + balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage())); } }); return ModuleUtils.getResult(balFuture); @@ -118,7 +117,7 @@ public static Object pollPayload(Environment env, BObject consumerObject, BDecim } catch (BError bError) { KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL); balFuture.complete(bError); - } catch (IllegalStateException | IllegalArgumentException | KafkaException e) { + } catch (Exception e) { KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL); balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage())); } diff --git a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java index e163007c..026fefa7 100644 --- a/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java +++ b/native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java @@ -57,6 +57,8 @@ import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.SslConfigs; @@ -1150,4 +1152,22 @@ public static boolean getAutoCommitConfig(BObject bObject) { public static boolean getAutoSeekOnErrorConfig(BObject bObject) { return (boolean) bObject.getMapValue(CONSUMER_CONFIG_FIELD_NAME).get(CONSUMER_ENABLE_AUTO_SEEK_CONFIG); } + + /** + * Retrieves the active connection count from Kafka consumer metrics. + * + * @param kafkaConsumer the Kafka consumer instance + * @return the number of active connections, or 0 if metric not available + */ + public static double getActiveConnectionCount(KafkaConsumer kafkaConsumer) { + Map metrics = kafkaConsumer.metrics(); + for (Map.Entry entry : metrics.entrySet()) { + MetricName metricName = entry.getKey(); + if ("connection-count".equals(metricName.name()) && + "consumer-metrics".equals(metricName.group())) { + return (double) entry.getValue().metricValue(); + } + } + return 0; + } }