Skip to content

Commit 5a19518

Browse files
author
“aashikam”
committed
Add error when server is not available
1 parent 91f785f commit 5a19518

File tree

3 files changed

+38
-9
lines changed

3 files changed

+38
-9
lines changed

native/src/main/java/io/ballerina/stdlib/kafka/impl/KafkaRecordConsumer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
package io.ballerina.stdlib.kafka.impl;
2020

21-
import io.ballerina.runtime.api.values.BError;
2221
import io.ballerina.stdlib.kafka.api.KafkaListener;
2322
import io.ballerina.stdlib.kafka.utils.KafkaConstants;
23+
import io.ballerina.stdlib.kafka.utils.KafkaUtils;
2424
import org.apache.kafka.clients.consumer.ConsumerConfig;
2525
import org.apache.kafka.clients.consumer.ConsumerRecords;
2626
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -104,18 +104,20 @@ private void poll() {
104104
+ this.consumerId + " has received " + recordsRetrieved.count() + " records.");
105105
}
106106
processRetrievedRecords(recordsRetrieved);
107-
} catch (KafkaException | IllegalStateException | IllegalArgumentException e) {
107+
} catch (Exception e) {
108108
this.kafkaListener.onError(e);
109109
// When un-recoverable exception is thrown we stop scheduling task to the executor.
110110
// Later at stopConsume() on KafkaRecordConsumer we close the consumer.
111111
this.pollTaskFuture.cancel(false);
112-
} catch (BError e) {
113-
this.kafkaListener.onError(e);
114112
}
115113
}
116114

117115
private void processRetrievedRecords(ConsumerRecords consumerRecords) {
118-
if (Objects.nonNull(consumerRecords) && !consumerRecords.isEmpty()) {
116+
if (Objects.isNull(consumerRecords)) {
117+
return;
118+
}
119+
120+
if (!consumerRecords.isEmpty()) {
119121
Semaphore sem = new Semaphore(0);
120122
KafkaPollCycleFutureListener pollCycleListener = new KafkaPollCycleFutureListener(sem, serviceId);
121123
this.kafkaListener.onRecordsReceived(consumerRecords, kafkaConsumer, groupId, pollCycleListener);
@@ -128,6 +130,13 @@ private void processRetrievedRecords(ConsumerRecords consumerRecords) {
128130
this.kafkaListener.onError(e);
129131
this.pollTaskFuture.cancel(false);
130132
}
133+
} else {
134+
// Check if there are active connections when we receive empty records
135+
double connectionCount = KafkaUtils.getActiveConnectionCount(kafkaConsumer);
136+
if (connectionCount == 0) {
137+
Exception serverDownException = new Exception("Server might not be available");
138+
this.kafkaListener.onError(serverDownException);
139+
}
131140
}
132141
}
133142

native/src/main/java/io/ballerina/stdlib/kafka/nativeimpl/consumer/Poll.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ public static Object poll(Environment env, BObject consumerObject, BDecimal time
8282
kafkaConsumer, autoSeek);
8383
}
8484
balFuture.complete(consumerRecords);
85-
} catch (IllegalStateException | IllegalArgumentException | KafkaException e) {
86-
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
87-
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
8885
} catch (BError e) {
8986
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
9087
balFuture.complete(e);
88+
} catch (Exception e) {
89+
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
90+
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
9191
}
9292
});
9393
return ModuleUtils.getResult(balFuture);
@@ -118,7 +118,7 @@ public static Object pollPayload(Environment env, BObject consumerObject, BDecim
118118
} catch (BError bError) {
119119
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
120120
balFuture.complete(bError);
121-
} catch (IllegalStateException | IllegalArgumentException | KafkaException e) {
121+
} catch (Exception e) {
122122
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
123123
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
124124
}

native/src/main/java/io/ballerina/stdlib/kafka/utils/KafkaUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@
5757
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
5858
import org.apache.kafka.clients.producer.KafkaProducer;
5959
import org.apache.kafka.clients.producer.ProducerConfig;
60+
import org.apache.kafka.common.Metric;
61+
import org.apache.kafka.common.MetricName;
6062
import org.apache.kafka.common.TopicPartition;
6163
import org.apache.kafka.common.config.SaslConfigs;
6264
import org.apache.kafka.common.config.SslConfigs;
@@ -1150,4 +1152,22 @@ public static boolean getAutoCommitConfig(BObject bObject) {
11501152
public static boolean getAutoSeekOnErrorConfig(BObject bObject) {
11511153
return (boolean) bObject.getMapValue(CONSUMER_CONFIG_FIELD_NAME).get(CONSUMER_ENABLE_AUTO_SEEK_CONFIG);
11521154
}
1155+
1156+
/**
1157+
* Retrieves the active connection count from Kafka consumer metrics.
1158+
*
1159+
* @param kafkaConsumer the Kafka consumer instance
1160+
* @return the number of active connections, or 0 if metric not available
1161+
*/
1162+
public static double getActiveConnectionCount(KafkaConsumer kafkaConsumer) {
1163+
Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();
1164+
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
1165+
MetricName metricName = entry.getKey();
1166+
if ("connection-count".equals(metricName.name()) &&
1167+
"consumer-metrics".equals(metricName.group())) {
1168+
return (double) entry.getValue().metricValue();
1169+
}
1170+
}
1171+
return 0;
1172+
}
11531173
}

0 commit comments

Comments
 (0)