-
Notifications
You must be signed in to change notification settings - Fork 86
mq.receive.max.poll.time.ms, to limit the maximum time spent polling messages in a Kafka Connect task cycle #153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've suggested a small refactoring that means we do the sum once up-front, and then just a straight comparison every time around the loop (rather than doing the sum in every loop). In practice, I suspect the performance impact of such a micro-optimisation will be minimal, so feel free to ignore.
Otherwise this all looks good to me, thanks!
} | ||
|
||
log.debug("Polling for records"); | ||
final long startTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final long startTime = System.currentTimeMillis(); | |
final long pollEndTime = System.currentTimeMillis() + maxPollTime; |
message != null && | ||
localList.size() < numberOfMessagesToBePolled && | ||
!stopNow.get() && | ||
(maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(maxPollTime <= 0 || (System.currentTimeMillis() - startTime) < maxPollTime) | |
(maxPollTime <= 0 || (System.currentTimeMillis() < pollEndTime) |
380a950
to
bdce9b9
Compare
- Maximum time (in milliseconds) to spend polling messages before returning a batch to Kafka. Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
Signed-off-by: A S Adil Mohammad <asadilmohammad2020@gmail.com>
This pull request introduces a new configuration option,
mq.receive.max.poll.time.ms
, to limit the maximum time spent polling messages in a Kafka Connect task cycle. It also includes updates to the implementation and tests to support this feature.New Configuration Option:
mq.receive.max.poll.time.ms
to define the maximum time (in milliseconds) for polling messages during a Kafka Connect task cycle. If set to0
, polling continues until the batch size is met or no more messages are available. This configuration is documented in theREADME.md
and integrated into the connector's configuration definitions.Implementation Updates:
MQSourceTask
class to use the newmq.receive.max.poll.time.ms
configuration. The polling logic now respects the maximum poll time, ensuring the task terminates polling early if the time limit is reached.Testing Enhancements:
MQSourceTaskIT.java
to validate the behavior of themq.receive.max.poll.time.ms
configuration under different scenarios, such as terminating early, respecting batch size, and handling a value of0
.These changes improve the flexibility and control of the Kafka Connect source connector for IBM MQ, allowing users to fine-tune message polling behavior.# Description
Type of change
How Has This Been Tested?
Checklist