Skip to content

Commit 0f2ee3e

Browse files
author
“aashikam”
committed
Add error when server is not available
1 parent 91f785f commit 0f2ee3e

File tree

3 files changed

+38
-11
lines changed

3 files changed

+38
-11
lines changed

native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordConsumer.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818

1919
package io.ballerina.stdlib.kafka.impl;
2020

21-
import io.ballerina.runtime.api.values.BError;
2221
import io.ballerina.stdlib.kafka.api.KafkaListener;
2322
import io.ballerina.stdlib.kafka.utils.KafkaConstants;
23+
import io.ballerina.stdlib.kafka.utils.KafkaUtils;
2424
import org.apache.kafka.clients.consumer.ConsumerConfig;
2525
import org.apache.kafka.clients.consumer.ConsumerRecords;
2626
import org.apache.kafka.clients.consumer.KafkaConsumer;
27-
import org.apache.kafka.common.KafkaException;
2827
import org.apache.kafka.common.errors.WakeupException;
2928
import org.slf4j.Logger;
3029
import org.slf4j.LoggerFactory;
@@ -104,18 +103,20 @@ private void poll() {
104103
+ this.consumerId + " has received " + recordsRetrieved.count() + " records.");
105104
}
106105
processRetrievedRecords(recordsRetrieved);
107-
} catch (KafkaException | IllegalStateException | IllegalArgumentException e) {
106+
} catch (Exception e) {
108107
this.kafkaListener.onError(e);
109108
// When un-recoverable exception is thrown we stop scheduling task to the executor.
110109
// Later at stopConsume() on KafkaRecordConsumer we close the consumer.
111110
this.pollTaskFuture.cancel(false);
112-
} catch (BError e) {
113-
this.kafkaListener.onError(e);
114111
}
115112
}
116113

117114
private void processRetrievedRecords(ConsumerRecords consumerRecords) {
118-
if (Objects.nonNull(consumerRecords) && !consumerRecords.isEmpty()) {
115+
if (Objects.isNull(consumerRecords)) {
116+
return;
117+
}
118+
119+
if (!consumerRecords.isEmpty()) {
119120
Semaphore sem = new Semaphore(0);
120121
KafkaPollCycleFutureListener pollCycleListener = new KafkaPollCycleFutureListener(sem, serviceId);
121122
this.kafkaListener.onRecordsReceived(consumerRecords, kafkaConsumer, groupId, pollCycleListener);
@@ -128,6 +129,13 @@ private void processRetrievedRecords(ConsumerRecords consumerRecords) {
128129
this.kafkaListener.onError(e);
129130
this.pollTaskFuture.cancel(false);
130131
}
132+
} else {
133+
// Check if there are active connections when we receive empty records
134+
double connectionCount = KafkaUtils.getActiveConnectionCount(kafkaConsumer);
135+
if (connectionCount == 0) {
136+
Exception serverDownException = new Exception("Server might not be available");
137+
this.kafkaListener.onError(serverDownException);
138+
}
131139
}
132140
}
133141

native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import io.ballerina.stdlib.kafka.utils.ModuleUtils;
3636
import org.apache.kafka.clients.consumer.ConsumerRecords;
3737
import org.apache.kafka.clients.consumer.KafkaConsumer;
38-
import org.apache.kafka.common.KafkaException;
3938

4039
import java.time.Duration;
4140
import java.util.concurrent.CompletableFuture;
@@ -82,12 +81,12 @@ public static Object poll(Environment env, BObject consumerObject, BDecimal time
8281
kafkaConsumer, autoSeek);
8382
}
8483
balFuture.complete(consumerRecords);
85-
} catch (IllegalStateException | IllegalArgumentException | KafkaException e) {
86-
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
87-
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
8884
} catch (BError e) {
8985
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
9086
balFuture.complete(e);
87+
} catch (Exception e) {
88+
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
89+
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
9190
}
9291
});
9392
return ModuleUtils.getResult(balFuture);
@@ -118,7 +117,7 @@ public static Object pollPayload(Environment env, BObject consumerObject, BDecim
118117
} catch (BError bError) {
119118
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
120119
balFuture.complete(bError);
121-
} catch (IllegalStateException | IllegalArgumentException | KafkaException e) {
120+
} catch (Exception e) {
122121
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
123122
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
124123
}

native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
5858
import org.apache.kafka.clients.producer.KafkaProducer;
5959
import org.apache.kafka.clients.producer.ProducerConfig;
60+
import org.apache.kafka.common.Metric;
61+
import org.apache.kafka.common.MetricName;
6062
import org.apache.kafka.common.TopicPartition;
6163
import org.apache.kafka.common.config.SaslConfigs;
6264
import org.apache.kafka.common.config.SslConfigs;
@@ -1150,4 +1152,22 @@ public static boolean getAutoCommitConfig(BObject bObject) {
11501152
public static boolean getAutoSeekOnErrorConfig(BObject bObject) {
11511153
return (boolean) bObject.getMapValue(CONSUMER_CONFIG_FIELD_NAME).get(CONSUMER_ENABLE_AUTO_SEEK_CONFIG);
11521154
}
1155+
1156+
/**
1157+
* Retrieves the active connection count from Kafka consumer metrics.
1158+
*
1159+
* @param kafkaConsumer the Kafka consumer instance
1160+
* @return the number of active connections, or 0 if metric not available
1161+
*/
1162+
public static double getActiveConnectionCount(KafkaConsumer kafkaConsumer) {
1163+
Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();
1164+
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
1165+
MetricName metricName = entry.getKey();
1166+
if ("connection-count".equals(metricName.name()) &&
1167+
"consumer-metrics".equals(metricName.group())) {
1168+
return (double) entry.getValue().metricValue();
1169+
}
1170+
}
1171+
return 0;
1172+
}
11531173
}

0 commit comments

Comments
 (0)