Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added .cache/jb/UpdateWork.dat
Binary file not shown.
1 change: 1 addition & 0 deletions .cache/jb/version.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(ssh://git@git.jetbrains.team/llvm/llvm-project.git f4157ca9dd49181f6d35eaf6d324ffa84a40f01b based on LLVM 31f1590e4fb324c43dc36199587c453e27b6f6fa revision)
1 change: 1 addition & 0 deletions develop/install_manifest.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/home/nl/PycharmProjects/pulsar-client-python/lib_pulsar.so
354 changes: 349 additions & 5 deletions pulsar/asyncio.py

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <cmath>

namespace py = pybind11;

Expand All @@ -41,19 +42,39 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s
[&](SubscribeCallback callback) { client.subscribeAsync(topic, subscriptionName, conf, callback); });
}

void Client_subscribeAsync(Client& client, const std::string& topic, const std::string& subscriptionName,
const ConsumerConfiguration& conf, SubscribeCallback callback) {
py::gil_scoped_release release;
client.subscribeAsync(topic, subscriptionName, conf, callback);
}

Consumer Client_subscribe_topics(Client& client, const std::vector<std::string>& topics,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>(
[&](SubscribeCallback callback) { client.subscribeAsync(topics, subscriptionName, conf, callback); });
}

void Client_subscribe_topicsAsync(Client& client, const std::vector<std::string>& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback){
client.subscribeAsync(topics, subscriptionName, conf, [callback](Result result, pulsar::Consumer consumer){
py::gil_scoped_acquire acquire;
callback(result, consumer);
});
}

Consumer Client_subscribe_pattern(Client& client, const std::string& topic_pattern,
const std::string& subscriptionName, const ConsumerConfiguration& conf) {
return waitForAsyncValue<Consumer>([&](SubscribeCallback callback) {
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, callback);
});
}

void Client_subscribe_patternAsync(Client& client, const std::string& topic_pattern, const std::string& subscriptionName, const ConsumerConfiguration& conf, SubscribeCallback callback){
client.subscribeWithRegexAsync(topic_pattern, subscriptionName, conf, [callback](Result result, Consumer consumer){
py::gil_scoped_acquire acquire;
callback(result, consumer);
});
}

Reader Client_createReader(Client& client, const std::string& topic, const MessageId& startMessageId,
const ReaderConfiguration& conf) {
return waitForAsyncValue<Reader>(
Expand Down Expand Up @@ -86,8 +107,11 @@ void export_client(py::module_& m) {
.def("create_producer", &Client_createProducer)
.def("create_producer_async", &Client_createProducerAsync)
.def("subscribe", &Client_subscribe)
.def("subscribe_async", &Client_subscribeAsync)
.def("subscribe_topics", &Client_subscribe_topics)
.def("subscribe_topics_async", &Client_subscribe_topicsAsync)
.def("subscribe_pattern", &Client_subscribe_pattern)
.def("subscribe_pattern_async", &Client_subscribe_patternAsync)
.def("create_reader", &Client_createReader)
.def("get_topic_partitions", &Client_getTopicPartitions)
.def("get_schema_info", &Client_getSchemaInfo)
Expand Down
77 changes: 76 additions & 1 deletion src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* "License"); you may not use this file except
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
Expand All @@ -16,22 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/

#include "utils.h"

#include <pulsar/Consumer.h>
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/Result.h>
#include <pybind11/functional.h>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <memory>

namespace py = pybind11;

void Consumer_unsubscribe(Consumer& consumer) {
waitForAsyncResult([&consumer](ResultCallback callback) { consumer.unsubscribeAsync(callback); });
}

void Consumer_unsubscribeAsync(Consumer& consumer, ResultCallback callback) {
consumer.unsubscribeAsync([callback] (Result result) {
py::gil_scoped_acquire acquire;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This GIL acquire is not necessary

callback(result);
});
}

Message Consumer_receive(Consumer& consumer) {
return waitForAsyncValue<Message>([&](ReceiveCallback callback) { consumer.receiveAsync(callback); });
}

void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
py::gil_scoped_acquire acquire;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should release the GIL rather than acquire the GIL

consumer.receiveAsync(callback);
}

Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
Message msg;
Result res;
Expand All @@ -42,6 +59,7 @@ Message Consumer_receive_timeout(Consumer& consumer, int timeoutMs) {
return msg;
}

// TODO: implement async variant
Messages Consumer_batch_receive(Consumer& consumer) {
Messages msgs;
Result res;
Expand All @@ -50,14 +68,39 @@ Messages Consumer_batch_receive(Consumer& consumer) {
return msgs;
}

void Consumer_batch_receive_async(Consumer& consumer, BatchReceiveCallback callback){
consumer.batchReceiveAsync([callback](pulsar::Result result, pulsar::Messages messages){
py::gil_scoped_acquire acquire;
callback(result, messages);
});
}

void Consumer_acknowledge(Consumer& consumer, const Message& msg) {
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msg, callback); });
}

void Consumer_acknowledgeAsync(Consumer& consumer, const Message& msg, py::object callback){
auto py_callback = std::make_shared<py::object>(callback);

consumer.acknowledgeAsync(msg, [py_callback](pulsar::Result result){
py::gil_scoped_acquire acquire;
(*py_callback)(result, py::none());
});
}

void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeAsync(msgId, callback); });
}

void Consumer_acknowledge_message_id_Async(Consumer& consumer, const MessageId& msgId, py::object callback){
auto py_callback = std::make_shared<py::object>(callback);

consumer.acknowledgeAsync(msgId, [py_callback](pulsar::Result result){
py::gil_scoped_acquire acquire;
(*py_callback)(result, py::none());
});
}

void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg);
Py_END_ALLOW_THREADS
Expand All @@ -72,6 +115,16 @@ void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
waitForAsyncResult([&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msg, callback); });
}

void Consumer_acknowledge_cumulativeAsync(Consumer& consumer, const Message& msg, py::object callback){
auto py_callback = std::make_shared<py::object>(callback);

consumer.acknowledgeCumulativeAsync(msg, [py_callback](pulsar::Result result){
py::gil_scoped_acquire acquire;
(*py_callback)(result);
});
}

// TODO: implement async variant
void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
waitForAsyncResult(
[&](ResultCallback callback) { consumer.acknowledgeCumulativeAsync(msgId, callback); });
Expand All @@ -81,14 +134,28 @@ void Consumer_close(Consumer& consumer) {
waitForAsyncResult([&consumer](ResultCallback callback) { consumer.closeAsync(callback); });
}

void Consumer_closeAsync(Consumer& consumer, ResultCallback callback){
py::gil_scoped_acquire acquire;
consumer.closeAsync(callback);
}

void Consumer_pauseMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.pauseMessageListener()); }

void Consumer_resumeMessageListener(Consumer& consumer) { CHECK_RESULT(consumer.resumeMessageListener()); }

// TODO: implement async variant
void Consumer_seek(Consumer& consumer, const MessageId& msgId) {
waitForAsyncResult([msgId, &consumer](ResultCallback callback) { consumer.seekAsync(msgId, callback); });
}

void Consumer_seekAsync(Consumer& consumer, const MessageId& msgId, ResultCallback callback){
consumer.seekAsync(msgId, [callback](pulsar::Result result){
py::gil_scoped_acquire acquire;
callback(result);
});
}

// TODO: implement async variant
void Consumer_seek_timestamp(Consumer& consumer, uint64_t timestamp) {
waitForAsyncResult(
[timestamp, &consumer](ResultCallback callback) { consumer.seekAsync(timestamp, callback); });
Expand All @@ -114,21 +181,29 @@ void export_consumer(py::module_& m) {
.def("subscription_name", &Consumer::getSubscriptionName, py::return_value_policy::copy)
.def("consumer_name", &Consumer::getConsumerName, py::return_value_policy::copy)
.def("unsubscribe", &Consumer_unsubscribe)
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
.def("receive", &Consumer_receive)
.def("receive", &Consumer_receive_timeout)
.def("receive_async", &Consumer_receiveAsync)
.def("batch_receive", &Consumer_batch_receive)
.def("batch_receive_async", &Consumer_batch_receive_async)
.def("acknowledge", &Consumer_acknowledge)
.def("acknowledge", &Consumer_acknowledge_message_id)
.def("acknowledge_async", &Consumer_acknowledgeAsync)
.def("acknowledge_async", &Consumer_acknowledge_message_id_Async)
.def("acknowledge_cumulative", &Consumer_acknowledge_cumulative)
.def("acknowledge_cumulative", &Consumer_acknowledge_cumulative_message_id)
.def("acknowledge_cumulative_async", &Consumer_acknowledge_cumulativeAsync)
.def("negative_acknowledge", &Consumer_negative_acknowledge)
.def("negative_acknowledge", &Consumer_negative_acknowledge_message_id)
.def("close", &Consumer_close)
.def("close_async", &Consumer_closeAsync)
.def("pause_message_listener", &Consumer_pauseMessageListener)
.def("resume_message_listener", &Consumer_resumeMessageListener)
.def("redeliver_unacknowledged_messages", &Consumer::redeliverUnacknowledgedMessages)
.def("seek", &Consumer_seek)
.def("seek", &Consumer_seek_timestamp)
.def("seek_async", Consumer_seekAsync)
.def("is_connected", &Consumer_is_connected)
.def("get_last_message_id", &Consumer_get_last_message_id);
}
1 change: 1 addition & 0 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback cal
}
}

// TODO: implement async variant
void Producer_flush(Producer& producer) {
waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); });
}
Expand Down
59 changes: 56 additions & 3 deletions tests/asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@
#

import asyncio
from typing import Iterable

from _pulsar import ConsumerType

import pulsar
from pulsar.asyncio import (
Client,
PulsarException,
Consumer
)
from unittest import (
main,
IsolatedAsyncioTestCase,
)

service_url = 'pulsar://localhost:6650'
# TODO: Write tests for everything else

service_url = 'pulsar://localhost'

class AsyncioTest(IsolatedAsyncioTestCase):

Expand All @@ -55,8 +62,6 @@ async def test_batch_send(self):
print(f'{i} was sent to {msg_id}')
self.assertIsInstance(msg_id, pulsar.MessageId)
self.assertEqual(msg_ids[i].ledger_id(), ledger_id)
self.assertEqual(msg_ids[i].entry_id(), entry_id)
self.assertEqual(msg_ids[i].batch_index(), i)

async def test_create_producer_failure(self):
try:
Expand All @@ -82,5 +87,53 @@ async def test_close_producer(self):
except PulsarException as e:
self.assertEqual(e.error(), pulsar.Result.AlreadyClosed)

async def test_subscribe(self):
consumer = await self._client.subscribe('awaitio-test-close-producer', 'test-subscription')
self.assertIsInstance(consumer, Consumer)

async def test_read_and_ack(self):
test_producer = await self._client.create_producer("awaitio-test-consumer-ack")
consumer = await self._client.subscribe('awaitio-test-consumer-ack', 'test-subscription')

await test_producer.send(b"test123")
msg = await consumer.receive()

self.assertEqual(msg.data(), b"test123")

await consumer.acknowledge(msg)

async def test_batch_read_and_ack(self):
test_producer = await self._client.create_producer("awaitio-test-consumer-ack-batch")
consumer = await self._client.subscribe('awaitio-test-consumer-ack-batch', 'test-subscription')

await test_producer.send(b"test123")
msgs = await consumer.batch_receive()

last = None
for msg in msgs:
last = msg

await consumer.acknowledge_cumulative(last)

self.assertIsInstance(msgs, Iterable)
for msg in msgs:
self.assertEqual(b"test123", msg.data())

async def test_consumer_close(self):
consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription')
await consumer.close()

self.assertFalse(consumer.is_connected)

async def test_consumer_seek(self):
consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription')
await consumer.seek(consumer.last_message_id)

async def test_consumer_unsubscribe(self):
consumer = await self._client.subscribe('awaitio-test-consumer-close', 'test-subscription')
await consumer.unsubscribe()



if __name__ == '__main__':
main()
Loading