-
Notifications
You must be signed in to change notification settings - Fork 48
Implementation of the asyncio consumer #228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 10 commits
5bbddb5
c3b6614
58b5eb5
d4aab5c
cef9a76
656a287
1c56c59
1f96710
1909e40
84f1396
b17606c
031cf47
e736852
b230e84
93e3cb3
d1485b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| (ssh://git@git.jetbrains.team/llvm/llvm-project.git f4157ca9dd49181f6d35eaf6d324ffa84a40f01b based on LLVM 31f1590e4fb324c43dc36199587c453e27b6f6fa revision) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| /home/nl/PycharmProjects/pulsar-client-python/lib_pulsar.so | ||
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,7 +4,7 @@ | |
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
BewareMyPower marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| * "License"); you may not use this file except | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
@@ -16,22 +16,39 @@ | |
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
|
|
||
| #include "utils.h" | ||
|
|
||
| #include <pulsar/Consumer.h> | ||
| #include <pulsar/ConsumerConfiguration.h> | ||
| #include <pulsar/Result.h> | ||
| #include <pybind11/functional.h> | ||
| #include <pybind11/pybind11.h> | ||
| #include <pybind11/stl.h> | ||
| #include <memory> | ||
|
|
||
| namespace py = pybind11; | ||
|
|
||
| void Consumer_unsubscribe(Consumer& consumer) { | ||
| waitForAsyncResult([&consumer](ResultCallback callback) { consumer.unsubscribeAsync(callback); }); | ||
| } | ||
|
|
||
| void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) { | ||
| consumer.unsubscribeAsync([callback] (Result result) { | ||
| py::gil_scoped_acquire acquire; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This GIL acquire is not necessary |
||
| callback(result); | ||
| }); | ||
| } | ||
|
|
||
| Message Consumer_receive(Consumer& consumer) { | ||
| return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); }); | ||
| } | ||
|
|
||
| void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) { | ||
| py::gil_scoped_acquire acquire; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It should release the GIL rather than acquire the GIL |
||
| consumer.receiveAsync(callback); | ||
| } | ||
|
|
||
| Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { | ||
| Message msg; | ||
| Result res; | ||
|
|
@@ -42,6 +59,7 @@ Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) { | |
| return msg; | ||
| } | ||
|
|
||
| // TODO: implement async variant | ||
| Messages Consumer_batch_receive(Consumer& consumer) { | ||
| Messages msgs; | ||
| Result res; | ||
|
|
@@ -50,14 +68,39 @@ Messages Consumer_batch_receive(Consumer& consumer) { | |
| return msgs; | ||
| } | ||
|
|
||
| void Consumer_batch_receive_async(Consumer& consumer, BatchReceiveCallback callback){ | ||
| consumer.batchReceiveAsync([callback](pulsar::Result result, pulsar::Messages messages){ | ||
| py::gil_scoped_acquire acquire; | ||
| callback(result, messages); | ||
| }); | ||
| } | ||
|
|
||
| void Consumer_acknowledge(Consumer& consumer, const Message& msg) { | ||
| waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msg, callback); }); | ||
| } | ||
|
|
||
| void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, py::object callback){ | ||
| auto py_callback = std::make_shared<py::object>(callback); | ||
|
|
||
| consumer.acknowledgeAsync(msg, [py_callback](pulsar::Result result){ | ||
| py::gil_scoped_acquire acquire; | ||
| (*py_callback)(result, py::none()); | ||
| }); | ||
| } | ||
|
|
||
| void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { | ||
| waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); }); | ||
| } | ||
|
|
||
| void Consumer_acknowledge_message_id_Async(Consumer& consumer, const MessageId& msgId, py::object callback){ | ||
| auto py_callback = std::make_shared<py::object>(callback); | ||
|
|
||
| consumer.acknowledgeAsync(msgId, [py_callback](pulsar::Result result){ | ||
| py::gil_scoped_acquire acquire; | ||
| (*py_callback)(result, py::none()); | ||
| }); | ||
| } | ||
|
|
||
| void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { | ||
| Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg); | ||
| Py_END_ALLOW_THREADS | ||
|
|
@@ -72,6 +115,16 @@ void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) { | |
| waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msg, callback); }); | ||
| } | ||
|
|
||
| void Consumer_acknowledge_cumulativeAsync(Consumer& consumer, const Message& msg, py::object callback){ | ||
| auto py_callback = std::make_shared<py::object>(callback); | ||
|
|
||
| consumer.acknowledgeCumulativeAsync(msg, [py_callback](pulsar::Result result){ | ||
| py::gil_scoped_acquire acquire; | ||
| (*py_callback)(result); | ||
| }); | ||
| } | ||
|
|
||
| // TODO: implement async variant | ||
| void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) { | ||
| waitForAsyncResult( | ||
| [&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msgId, callback); }); | ||
|
|
@@ -81,14 +134,28 @@ void Consumer_close(Consumer& consumer) { | |
| waitForAsyncResult([&consumer](ResultCallback callback) { consumer.closeAsync(callback); }); | ||
| } | ||
|
|
||
| void Consumer_closeAsync(Consumer& consumer, ResultCallback callback){ | ||
| py::gil_scoped_acquire acquire; | ||
| consumer.closeAsync(callback); | ||
| } | ||
|
|
||
| void Consumer_pauseMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.pauseMessageListener()); } | ||
|
|
||
| void Consumer_resumeMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.resumeMessageListener()); } | ||
|
|
||
| // TODO: implement async variant | ||
| void Consumer_seek(Consumer& consumer, const MessageId& msgId) { | ||
| waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); }); | ||
| } | ||
|
|
||
| void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId, ResultCallback callback){ | ||
| consumer.seekAsync(msgId, [callback](pulsar::Result result){ | ||
| py::gil_scoped_acquire acquire; | ||
| callback(result); | ||
| }); | ||
| } | ||
|
|
||
| // TODO: implement async variant | ||
| void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) { | ||
| waitForAsyncResult( | ||
| [timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); }); | ||
|
|
@@ -114,21 +181,29 @@ void export_consumer(py::module_& m) { | |
| .def("subscription_name", &Consumer::getSubscriptionName, py::return_value_policy::copy) | ||
| .def("consumer_name", &Consumer::getConsumerName, py::return_value_policy::copy) | ||
| .def("unsubscribe", &Consumer_unsubscribe) | ||
| .def("unsubscribe_async", &Consumer_unsubscribeAsync) | ||
| .def("receive", &Consumer_receive) | ||
| .def("receive", &Consumer_receive_timeout) | ||
| .def("receive_async", &Consumer_receiveAsync) | ||
| .def("batch_receive", &Consumer_batch_receive) | ||
| .def("batch_receive_async", &Consumer_batch_receive_async) | ||
| .def("acknowledge", &Consumer_acknowledge) | ||
| .def("acknowledge", &Consumer_acknowledge_message_id) | ||
| .def("acknowledge_async", &Consumer_acknowledgeAsync) | ||
| .def("acknowledge_async", &Consumer_acknowledge_message_id_Async) | ||
| .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative) | ||
| .def("acknowledge_cumulative", &Consumer_acknowledge_cumulative_message_id) | ||
| .def("acknowledge_cumulative_async", &Consumer_acknowledge_cumulativeAsync) | ||
| .def("negative_acknowledge", &Consumer_negative_acknowledge) | ||
| .def("negative_acknowledge", &Consumer_negative_acknowledge_message_id) | ||
| .def("close", &Consumer_close) | ||
| .def("close_async", &Consumer_closeAsync) | ||
| .def("pause_message_listener", &Consumer_pauseMessageListener) | ||
| .def("resume_message_listener", &Consumer_resumeMessageListener) | ||
| .def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages) | ||
| .def("seek", &Consumer_seek) | ||
| .def("seek", &Consumer_seek_timestamp) | ||
| .def("seek_async", Consumer_seekAsync) | ||
| .def("is_connected", &Consumer_is_connected) | ||
| .def("get_last_message_id", &Consumer_get_last_message_id); | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.