Skip to content

Add configurable prefetch_count #20

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"github.vscode-github-actions"
],
"settings": {
"python.defaultInterpreterPath": "/workspaces/${localWorkspaceFolderBasename}/.venv/bin/python",
"yaml.schemas": {
"https://asyncapi.com/schema-store/3.0.0-without-$id.json": [
"file:///workspaces/asyncapi-python/examples/*.yaml"
Expand Down
49 changes: 7 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ asyncapi_python_service(
)
```

This will be generating python module named `asyncapi_app` on `codegen-export` and `export` goals.
This will be generating python module named `asyncapi_app` on `codegen-export`, and `export` goals.
This target can later be used as a dependency of `python_sources`.

```python
Expand All @@ -100,51 +100,16 @@ Note that this plugin does not do dependency injection, so asyncapi-python must
asyncapi-python[amqp]
```

### Deploying this plugin into pants monorepo

In order to deploy this plugin into your pants monorepo, create the following structure inside your plugins folder:

```bash
pants-plugins/
└── asyncapi_python_plugin
├── BUILD
├── __init__.py
├── register.py
└── requirements.txt
```

`requirements.txt` must contain:

```text
asyncapi-python
```

`register.py` should have:

```python
from asyncapi_python_pants.register import *
```

`BUILD` must include:

```python
python_sources(
dependencies=[":reqs"],
)

python_requirements(
name="reqs",
)
```

`__init__.py` can be empty, but it has to exist.

Finally, add `pants-plugins` to your `PYTHONPATH`, and add the created folder as a backend package:
### Deploying this plugin into your pants monorepo

```toml
# pants.toml
plugins = [
"asyncapi_python[codegen]==0.2.5", # Plugin version MUST match the version of your python clients
...
]
backend_packages = [
"asyncapi_python_plugin",
"asyncapi_python_pants",
...
]
pythonpath = ["%(buildroot)s/pants-plugins"]
Expand Down
11 changes: 6 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "asyncapi-python"
version = "0.2.4"
version = "0.2.5"
license = "Apache-2.0"
description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications."
authors = ["Yaroslav Petrov <yaroslav.v.petrov@gmail.com>"]
Expand Down Expand Up @@ -52,9 +52,10 @@ pex = "*"
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"

[tool.poetry.requires-plugins]
poetry-plugin-export = ">=1.8"

[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "session"
asyncio_default_test_loop_scope = "session"
2 changes: 2 additions & 0 deletions src/asyncapi_python/amqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
from .operation import Operation
from .utils import union_model
from .error import Rejection, RejectedError
from .params import AmqpParams

__all__ = [
"channel_pool",
"AmqpParams",
"AmqpPool",
"BaseApplication",
"Router",
Expand Down
3 changes: 3 additions & 0 deletions src/asyncapi_python/amqp/base_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .endpoint import EndpointParams
from .connection import channel_pool
from .utils import encode_message, decode_message
from .params import AmqpParams
from typing import Generic, Optional, TypeVar


Expand Down Expand Up @@ -61,6 +62,7 @@ def __init__(
amqp_uri: str,
producer_factory: type[P],
consumer_factory: type[C],
amqp_params: AmqpParams,
):
self.__params = EndpointParams(
pool=channel_pool(amqp_uri),
Expand All @@ -69,6 +71,7 @@ def __init__(
register_correlation_id=self.__register_correlation_id,
stop_application=self.stop,
app_id=str(uuid4()),
amqp_params=amqp_params,
)
self.__reply_futures: dict[
str,
Expand Down
2 changes: 2 additions & 0 deletions src/asyncapi_python/amqp/endpoint/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from ..error import Rejection, RejectedError
from ..connection import AmqpPool
from ..operation import Operation
from ..params import AmqpParams
from aio_pika.abc import (
AbstractRobustChannel,
AbstractRobustQueue,
Expand Down Expand Up @@ -64,6 +65,7 @@ class EndpointParams:
]
app_id: str
stop_application: Callable[[], Awaitable[None]]
amqp_params: AmqpParams

@property
def reply_queue_name(self) -> str:
Expand Down
2 changes: 2 additions & 0 deletions src/asyncapi_python/amqp/endpoint/receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ async def start(self) -> None:
print("start", self._op)
if self._fn:
async with self._params.pool.acquire() as ch:
if prefetch_count := self._params.amqp_params.get("prefetch_count"):
await ch.set_qos(prefetch_count=prefetch_count)
q = self._queue = await self._declare(ch)
self._consumer_tag = await q.consume(self._consumer)
return
Expand Down
5 changes: 5 additions & 0 deletions src/asyncapi_python/amqp/params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from typing import TypedDict


class AmqpParams(TypedDict, total=False):
prefetch_count: int
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
limitations under the License. #}
from .consumer import _Router_0 as Consumer
from .producer import _Router_0 as Producer
from asyncapi_python.amqp.base_application import BaseApplication
from asyncapi_python.amqp import BaseApplication, AmqpParams


class Application(BaseApplication[Producer, Consumer]):
def __init__(self, amqp_uri: str):
super().__init__(amqp_uri, Producer, Consumer)
def __init__(self, amqp_uri: str, amqp_params: AmqpParams = {}):
super().__init__(amqp_uri, Producer, Consumer, amqp_params)
1 change: 1 addition & 0 deletions tests/core/amqp/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@

@pytest_asyncio.fixture(scope="function")
async def amqp_pool(amqp_uri: str) -> AsyncGenerator[AmqpPool, None]:
channel_pool.cache_clear()
pool = channel_pool(amqp_uri)
yield pool
2 changes: 2 additions & 0 deletions tests/core/amqp/test_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ def params(
decode=decode_message,
app_id=app_id,
stop_application=lambda: exit(-1),
amqp_params={},
)


Expand All @@ -135,6 +136,7 @@ def params_2(amqp_pool, correlation_ids):
return params("app-2", amqp_pool, correlation_ids)


@pytest.mark.asyncio
async def test_queue(params_1: EndpointParams, operation: Operation):
producer: Sender[Log] = Sender(operation, params_1)
consumer: Receiver[Log] = Receiver(operation, params_1)
Expand Down
Loading