Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,12 @@

package io.ballerina.stdlib.kafka.impl;

import io.ballerina.runtime.api.values.BError;
import io.ballerina.stdlib.kafka.api.KafkaListener;
import io.ballerina.stdlib.kafka.utils.KafkaConstants;
import io.ballerina.stdlib.kafka.utils.KafkaUtils;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -104,18 +103,20 @@ private void poll() {
+ this.consumerId + " has received " + recordsRetrieved.count() + " records.");
}
processRetrievedRecords(recordsRetrieved);
} catch (KafkaException | IllegalStateException | IllegalArgumentException e) {
} catch (Exception e) {
this.kafkaListener.onError(e);
// When un-recoverable exception is thrown we stop scheduling task to the executor.
// Later at stopConsume() on KafkaRecordConsumer we close the consumer.
this.pollTaskFuture.cancel(false);
} catch (BError e) {
this.kafkaListener.onError(e);
}
}

private void processRetrievedRecords(ConsumerRecords consumerRecords) {
if (Objects.nonNull(consumerRecords) && !consumerRecords.isEmpty()) {
if (Objects.isNull(consumerRecords)) {
return;
}

if (!consumerRecords.isEmpty()) {
Semaphore sem = new Semaphore(0);
KafkaPollCycleFutureListener pollCycleListener = new KafkaPollCycleFutureListener(sem, serviceId);
this.kafkaListener.onRecordsReceived(consumerRecords, kafkaConsumer, groupId, pollCycleListener);
Expand All @@ -128,6 +129,13 @@ private void processRetrievedRecords(ConsumerRecords consumerRecords) {
this.kafkaListener.onError(e);
this.pollTaskFuture.cancel(false);
}
} else {
// Check if there are active connections when we receive empty records
double connectionCount = KafkaUtils.getActiveConnectionCount(kafkaConsumer);
if (connectionCount == 0) {
Exception serverDownException = new Exception("Server might not be available");
this.kafkaListener.onError(serverDownException);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.ballerina.stdlib.kafka.utils.ModuleUtils;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -82,12 +81,12 @@ public static Object poll(Environment env, BObject consumerObject, BDecimal time
kafkaConsumer, autoSeek);
}
balFuture.complete(consumerRecords);
} catch (IllegalStateException | IllegalArgumentException | KafkaException e) {
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
} catch (BError e) {
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
balFuture.complete(e);
} catch (Exception e) {
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
}
});
return ModuleUtils.getResult(balFuture);
Expand Down Expand Up @@ -118,7 +117,7 @@ public static Object pollPayload(Environment env, BObject consumerObject, BDecim
} catch (BError bError) {
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
balFuture.complete(bError);
} catch (IllegalStateException | IllegalArgumentException | KafkaException e) {
} catch (Exception e) {
KafkaMetricsUtil.reportConsumerError(consumerObject, KafkaObservabilityConstants.ERROR_TYPE_POLL);
balFuture.complete(createKafkaError("Failed to poll from the Kafka server: " + e.getMessage()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.config.SslConfigs;
Expand Down Expand Up @@ -1150,4 +1152,22 @@ public static boolean getAutoCommitConfig(BObject bObject) {
public static boolean getAutoSeekOnErrorConfig(BObject bObject) {
return (boolean) bObject.getMapValue(CONSUMER_CONFIG_FIELD_NAME).get(CONSUMER_ENABLE_AUTO_SEEK_CONFIG);
}

/**
* Retrieves the active connection count from Kafka consumer metrics.
*
* @param kafkaConsumer the Kafka consumer instance
* @return the number of active connections, or 0 if metric not available
*/
public static double getActiveConnectionCount(KafkaConsumer kafkaConsumer) {
Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName metricName = entry.getKey();
if ("connection-count".equals(metricName.name()) &&
"consumer-metrics".equals(metricName.group())) {
return (double) entry.getValue().metricValue();
}
}
return 0;
}
}