Skip to content

Commit 8b37f20

Browse files
authored
Add configurable prefetch_count (#20)
* Add basic implementation of amqp_params * Fix typing * Update docs and tests * Fix tests
1 parent 717e80b commit 8b37f20

File tree

11 files changed

+34
-50
lines changed

11 files changed

+34
-50
lines changed

.devcontainer/devcontainer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
"github.vscode-github-actions"
2626
],
2727
"settings": {
28+
"python.defaultInterpreterPath": "/workspaces/${localWorkspaceFolderBasename}/.venv/bin/python",
2829
"yaml.schemas": {
2930
"https://asyncapi.com/schema-store/3.0.0-without-$id.json": [
3031
"file:///workspaces/asyncapi-python/examples/*.yaml"

README.md

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ asyncapi_python_service(
7676
)
7777
```
7878

79-
This will be generating python module named `asyncapi_app` on `codegen-export` and `export` goals.
79+
This will be generating python module named `asyncapi_app` on `codegen-export`, and `export` goals.
8080
This target can later be used as a dependency of `python_sources`.
8181

8282
```python
@@ -100,51 +100,16 @@ Note that this plugin does not do dependency injection, so asyncapi-python must
100100
asyncapi-python[amqp]
101101
```
102102

103-
### Deploying this plugin into pants monorepo
104-
105-
In order to deploy this plugin into your pants monorepo, create the following structure inside your plugins folder:
106-
107-
```bash
108-
pants-plugins/
109-
└── asyncapi_python_plugin
110-
├── BUILD
111-
├── __init__.py
112-
├── register.py
113-
└── requirements.txt
114-
```
115-
116-
`requirements.txt` must contain:
117-
118-
```text
119-
asyncapi-python
120-
```
121-
122-
`register.py` should have:
123-
124-
```python
125-
from asyncapi_python_pants.register import *
126-
```
127-
128-
`BUILD` must include:
129-
130-
```python
131-
python_sources(
132-
dependencies=[":reqs"],
133-
)
134-
135-
python_requirements(
136-
name="reqs",
137-
)
138-
```
139-
140-
`__init__.py` can be empty, but it has to exist.
141-
142-
Finally, add `pants-plugins` to your `PYTHONPATH`, and add the created folder as a backend package:
103+
### Deploying this plugin into your pants monorepo
143104

144105
```toml
145106
# pants.toml
107+
plugins = [
108+
"asyncapi_python[codegen]==0.2.5", # Plugin version MUST match the version of your python clients
109+
...
110+
]
146111
backend_packages = [
147-
"asyncapi_python_plugin",
112+
"asyncapi_python_pants",
148113
...
149114
]
150115
pythonpath = ["%(buildroot)s/pants-plugins"]

pyproject.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "asyncapi-python"
3-
version = "0.2.4"
3+
version = "0.2.5"
44
license = "Apache-2.0"
55
description = "Easily generate type-safe and async Python applications from AsyncAPI 3 specifications."
66
authors = ["Yaroslav Petrov <yaroslav.v.petrov@gmail.com>"]
@@ -52,9 +52,10 @@ pex = "*"
5252
requires = ["poetry-core"]
5353
build-backend = "poetry.core.masonry.api"
5454

55-
[tool.pytest.ini_options]
56-
asyncio_mode = "auto"
57-
asyncio_default_fixture_loop_scope = "function"
58-
5955
[tool.poetry.requires-plugins]
6056
poetry-plugin-export = ">=1.8"
57+
58+
[tool.pytest.ini_options]
59+
asyncio_mode = "auto"
60+
asyncio_default_fixture_loop_scope = "session"
61+
asyncio_default_test_loop_scope = "session"

src/asyncapi_python/amqp/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
from .operation import Operation
2020
from .utils import union_model
2121
from .error import Rejection, RejectedError
22+
from .params import AmqpParams
2223

2324
__all__ = [
2425
"channel_pool",
26+
"AmqpParams",
2527
"AmqpPool",
2628
"BaseApplication",
2729
"Router",

src/asyncapi_python/amqp/base_application.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from .endpoint import EndpointParams
2424
from .connection import channel_pool
2525
from .utils import encode_message, decode_message
26+
from .params import AmqpParams
2627
from typing import Generic, Optional, TypeVar
2728

2829

@@ -61,6 +62,7 @@ def __init__(
6162
amqp_uri: str,
6263
producer_factory: type[P],
6364
consumer_factory: type[C],
65+
amqp_params: AmqpParams,
6466
):
6567
self.__params = EndpointParams(
6668
pool=channel_pool(amqp_uri),
@@ -69,6 +71,7 @@ def __init__(
6971
register_correlation_id=self.__register_correlation_id,
7072
stop_application=self.stop,
7173
app_id=str(uuid4()),
74+
amqp_params=amqp_params,
7275
)
7376
self.__reply_futures: dict[
7477
str,

src/asyncapi_python/amqp/endpoint/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from ..error import Rejection, RejectedError
3232
from ..connection import AmqpPool
3333
from ..operation import Operation
34+
from ..params import AmqpParams
3435
from aio_pika.abc import (
3536
AbstractRobustChannel,
3637
AbstractRobustQueue,
@@ -64,6 +65,7 @@ class EndpointParams:
6465
]
6566
app_id: str
6667
stop_application: Callable[[], Awaitable[None]]
68+
amqp_params: AmqpParams
6769

6870
@property
6971
def reply_queue_name(self) -> str:

src/asyncapi_python/amqp/endpoint/receiver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ async def start(self) -> None:
4545
print("start", self._op)
4646
if self._fn:
4747
async with self._params.pool.acquire() as ch:
48+
if prefetch_count := self._params.amqp_params.get("prefetch_count"):
49+
await ch.set_qos(prefetch_count=prefetch_count)
4850
q = self._queue = await self._declare(ch)
4951
self._consumer_tag = await q.consume(self._consumer)
5052
return

src/asyncapi_python/amqp/params.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from typing import TypedDict
2+
3+
4+
class AmqpParams(TypedDict, total=False):
5+
prefetch_count: int

src/asyncapi_python_codegen/generators/amqp/templates/application.py.j2

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
limitations under the License. #}
1414
from .consumer import _Router_0 as Consumer
1515
from .producer import _Router_0 as Producer
16-
from asyncapi_python.amqp.base_application import BaseApplication
16+
from asyncapi_python.amqp import BaseApplication, AmqpParams
1717

1818

1919
class Application(BaseApplication[Producer, Consumer]):
20-
def __init__(self, amqp_uri: str):
21-
super().__init__(amqp_uri, Producer, Consumer)
20+
def __init__(self, amqp_uri: str, amqp_params: AmqpParams = {}):
21+
super().__init__(amqp_uri, Producer, Consumer, amqp_params)

tests/core/amqp/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,6 @@
2020

2121
@pytest_asyncio.fixture(scope="function")
2222
async def amqp_pool(amqp_uri: str) -> AsyncGenerator[AmqpPool, None]:
23+
channel_pool.cache_clear()
2324
pool = channel_pool(amqp_uri)
2425
yield pool

0 commit comments

Comments
 (0)