Skip to content

Use FastStream 0.6.0 #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 75 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
aad0c32
Fix FastStream & OTEL deprecations
vrslev Jul 18, 2025
8743e63
Update faststream dependency to 0.6 and configure uv to use git source
vrslev Jul 18, 2025
1ea2397
Refactor StompBroker to use configuration and specification classes
vrslev Jul 18, 2025
e28f36f
Update import to use faststream.message for StreamMessage and gen_cor_id
vrslev Jul 18, 2025
17e57e9
Add `TestStompBroker` to exports and note pending export update
vrslev Jul 18, 2025
af3aa2a
Update imports and class inheritance for better internal module align…
vrslev Jul 18, 2025
14e231e
Update imports to use internal module structure and remove deprecated…
vrslev Jul 18, 2025
73e5f03
Implement Stomp subscriber configuration and specification classes, a…
vrslev Jul 18, 2025
daaeb52
Refactor StompSubscriber to use config-based client and prefix handling
vrslev Jul 18, 2025
3acc9a8
Remove unused logging context and get_fmt method from StompBroker
vrslev Jul 18, 2025
c93fc47
Remove unused StompLogContext type and simplify get_log_context return
vrslev Jul 18, 2025
768d8a6
Refactor StompProducer to use StompPublishCommand and update method s…
vrslev Jul 18, 2025
7ebc804
Move StompBrokerConfig and add publisher configuration classes to imp…
vrslev Jul 18, 2025
a84d065
Refactor publisher classes to use new specification and usecase confi…
vrslev Jul 18, 2025
00fa5be
Refactor StompRegistrator and StompSubscriber to use configuration an…
vrslev Jul 18, 2025
3c6f816
Remove unused publisher kwargs and restructure publisher initializati…
vrslev Jul 18, 2025
e9edf8f
Update dependencies type hint from Depends to Dependant in StompRegis…
vrslev Jul 18, 2025
a14be1b
Refactor StompBroker to use stop instead of _close and remove depreca…
vrslev Jul 18, 2025
83c55ab
Improve StompBroker initialization by removing unused import and field
vrslev Jul 18, 2025
a31df7e
Refactor publish and request methods to use StompPublishCommand
vrslev Jul 18, 2025
6c634c9
Update telemetry and prometheus modules to use StompPublishCommand in…
vrslev Jul 18, 2025
531dd61
Update imports and adjust type parameters in StompRouter to use inter…
vrslev Jul 18, 2025
45bfd99
Add subscriber and publisher tracking with prefix support and context…
vrslev Jul 18, 2025
9dcf822
Implement custom fake publisher for STOMP with reply destination hand…
vrslev Jul 18, 2025
2c3614b
Refactor StompBroker to use updated internal broker and configuration…
vrslev Jul 18, 2025
333391d
Add TODO comments for interface improvements in Stomp router components
vrslev Jul 18, 2025
41a17ec
Enhance Stomp registrator and router with middleware support and para…
vrslev Jul 21, 2025
98d5beb
Implement custom logging for StompBroker using StompParamsStorage
vrslev Jul 21, 2025
2ce9d11
Fix logger and middleware access in StompBroker tests
vrslev Jul 21, 2025
d10a530
Fix return type in patch_command and update pyproject.toml ignore list
vrslev Jul 21, 2025
ebdc7eb
Refactor destination handling to use prefix-aware properties
vrslev Jul 21, 2025
e698417
Implement task exception handling for connection failures, convert ca…
vrslev Jul 21, 2025
4c1d454
Fix usage of producer and headers in publish methods by using config …
vrslev Jul 21, 2025
3ce3aab
Update
vrslev Jul 21, 2025
8cc5e06
Implement route publisher as actual interface
vrslev Jul 21, 2025
ec06377
Remove unused TYPE_CHECKING import and update connection logic to use…
vrslev Jul 21, 2025
74da06f
Add StompProducer to context and implement start method
vrslev Jul 21, 2025
234e7da
Add pytest-timeout to dev dependencies with version constraint
vrslev Jul 21, 2025
323aa4c
Implement subscriber cleanup in broker stop method and pin pytest-tim…
vrslev Jul 21, 2025
b637d42
Fix subscriber state on broker start by setting running status explic…
vrslev Jul 21, 2025
e3977cd
Update internal fields to exclude from repr and remove unnecessary no…
vrslev Jul 21, 2025
84e2b51
Replace direct assignment with method call to clear active connection…
vrslev Jul 21, 2025
0fa2d47
Add skip marker to test_router and update logging test to use broker …
vrslev Jul 21, 2025
cd4e975
Fix ack policy logic and simplify test schema setup
vrslev Jul 21, 2025
dcc97bc
Add example STOMP client and subscriber using FastStream and stompman
vrslev Jul 21, 2025
39b9fe4
Implement subscriber post-start logic to manage state properly
vrslev Jul 21, 2025
ccbe215
Update registrator to use self.config instead of broker_config and re…
vrslev Jul 21, 2025
fbf3fe0
Add type ignore comments to suppress outer_config type warnings in St…
vrslev Jul 21, 2025
f876331
Implement publish_batch method and add corresponding test for not imp…
vrslev Jul 21, 2025
815162a
Add return value and configure route publisher in test router
vrslev Jul 21, 2025
8e14575
Improve response publisher construction by removing redundant parenth…
vrslev Jul 21, 2025
82e208d
Implement async iterator and add tests for not implemented subscriber…
vrslev Jul 21, 2025
d3e36a1
Refactor message handling by moving StompStreamMessage to configs mod…
vrslev Jul 21, 2025
c44a8ca
Refactor configs into models module and remove redundant configs file
vrslev Jul 21, 2025
a7029db
Remove message.py file from faststream_stomp module
vrslev Jul 21, 2025
4075c78
Refactor imports for cleaner organization in broker module
vrslev Jul 21, 2025
4b580b8
Update imports to use consolidated faststream module for shared classes
vrslev Jul 21, 2025
ca1cd2d
Fix type annotation for publish_batch messages and remove quotes from…
vrslev Jul 21, 2025
f572ca4
Remove example file and fix subscription method calls
vrslev Jul 21, 2025
1d0ca54
Remove unnecessary type ignore from settings provider factory lambda
vrslev Jul 21, 2025
a683730
Reorder and clean up pyproject.toml ruff ignore list
vrslev Jul 21, 2025
dbf0aa9
Update module metadata with __all__ exports in telemetry and promethe…
vrslev Jul 21, 2025
3784ba8
Refactor to use BrokerConfigWithStompClient instead of StompBrokerConfig
vrslev Jul 21, 2025
4016ee5
Add `StompPublishCommand` to exports and refactor subscriber and publ…
vrslev Jul 21, 2025
c41b723
Add import for StompPublishCommand in __init__.py
vrslev Jul 21, 2025
9dce412
Remove unnecessary cast and add type ignore with comment for type com…
vrslev Jul 21, 2025
8f92406
Improve typing and fix comment spacing in StompPublisher initialization
vrslev Jul 21, 2025
6e90410
Fix return type annotations for get_one and __aiter__ methods to matc…
vrslev Jul 21, 2025
aa396f2
Add AsyncIterator return type to aiter method in StompSubscriber
vrslev Jul 21, 2025
e5f8ef5
Merge branch 'main' into faststream-0.6.0
vrslev Jul 21, 2025
3d90532
Update type annotations and faststream dependency to use specific tag…
vrslev Aug 4, 2025
9605e9b
Merge branch 'main' into faststream-0.6.0
vrslev Aug 4, 2025
ff16b5c
Replace anonymous subscriber functions with named handlers in integra…
vrslev Aug 4, 2025
7cc64f8
Merge branch 'main' into faststream-0.6.0
vrslev Aug 8, 2025
bf90e2b
Update faststream dependency version from 0.6.0rc0 to 0.6.0rc1 in sto…
vrslev Aug 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion packages/faststream-stomp/faststream_stomp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from faststream_stomp.broker import StompBroker
from faststream_stomp.message import StompStreamMessage
from faststream_stomp.models import StompPublishCommand, StompStreamMessage
from faststream_stomp.publisher import StompPublisher
from faststream_stomp.router import StompRoute, StompRoutePublisher, StompRouter
from faststream_stomp.subscriber import StompSubscriber
from faststream_stomp.testing import TestStompBroker

__all__ = [
"StompBroker",
"StompPublishCommand",
"StompPublisher",
"StompRoute",
"StompRoutePublisher",
Expand Down
191 changes: 119 additions & 72 deletions packages/faststream-stomp/faststream_stomp/broker.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
import asyncio
import logging
import types
from collections.abc import Callable, Iterable, Mapping, Sequence
from typing import Any, Unpack
import typing
from collections.abc import Iterable, Sequence
from typing import Any

import anyio
import stompman
from fast_depends.dependencies import Depends
from faststream.asyncapi.schema import Tag, TagDict
from faststream.broker.core.usecase import BrokerUsecase
from faststream.broker.types import BrokerMiddleware, CustomCallable
from faststream.log.logging import get_broker_logger
from fast_depends.dependencies import Dependant
from faststream import ContextRepo, PublishType
from faststream._internal.basic_types import LoggerProto, SendableMessage
from faststream._internal.broker import BrokerUsecase
from faststream._internal.broker.registrator import Registrator
from faststream._internal.configs import BrokerConfig
from faststream._internal.constants import EMPTY
from faststream._internal.di import FastDependsConfig
from faststream._internal.logger import DefaultLoggerStorage, make_logger_state
from faststream._internal.logger.logging import get_broker_logger
from faststream._internal.types import BrokerMiddleware, CustomCallable
from faststream.security import BaseSecurity
from faststream.types import EMPTY, AnyDict, Decorator, LoggerProto, SendableMessage
from faststream.specification.schema import BrokerSpec
from faststream.specification.schema.extra import Tag, TagDict

from faststream_stomp.models import BrokerConfigWithStompClient, StompPublishCommand
from faststream_stomp.publisher import StompProducer, StompPublisher
from faststream_stomp.registrator import StompRegistrator
from faststream_stomp.subscriber import StompLogContext, StompSubscriber
from faststream_stomp.subscriber import StompSubscriber


class StompSecurity(BaseSecurity):
def __init__(self) -> None:
self.ssl_context = None
self.use_ssl = False

def get_requirement(self) -> list[AnyDict]: # noqa: PLR6301
def get_requirement(self) -> list[dict[str, Any]]: # noqa: PLR6301
return [{"user-password": []}]

def get_schema(self) -> dict[str, dict[str, str]]: # noqa: PLR6301
Expand All @@ -43,83 +52,111 @@ def _handle_listen_task_done(listen_task: asyncio.Task[None]) -> None:
raise SystemExit(1)


class StompBroker(StompRegistrator, BrokerUsecase[stompman.MessageFrame, stompman.Client]):
_subscribers: Mapping[int, StompSubscriber]
_publishers: Mapping[int, StompPublisher]
class StompParamsStorage(DefaultLoggerStorage):
__max_msg_id_ln = 10
_max_channel_name = 4

def get_logger(self, *, context: ContextRepo) -> LoggerProto:
if logger := self._get_logger_ref():
return logger
logger = get_broker_logger(
name="stomp",
default_context={"destination": "", "message_id": ""},
message_id_ln=self.__max_msg_id_ln,
fmt=(
"%(asctime)s %(levelname)-8s - "
f"%(destination)-{self._max_channel_name}s | "
f"%(message_id)-{self.__max_msg_id_ln}s "
"- %(message)s"
),
context=context,
log_level=self.logger_log_level,
)
self._logger_ref.add(logger)
return logger


class StompBroker(
StompRegistrator,
BrokerUsecase[
stompman.MessageFrame,
stompman.Client,
BrokerConfig, # Using BrokerConfig to avoid typing issues when passing broker to FastStream app
],
):
_subscribers: list[StompSubscriber] # type: ignore[assignment]
_publishers: list[StompPublisher] # type: ignore[assignment]

def __init__(
self,
client: stompman.Client,
*,
decoder: CustomCallable | None = None,
parser: CustomCallable | None = None,
dependencies: Iterable[Depends] = (),
middlewares: Sequence[BrokerMiddleware[stompman.MessageFrame]] = (),
dependencies: Iterable[Dependant] = (),
middlewares: Sequence[BrokerMiddleware[stompman.MessageFrame, StompPublishCommand]] = (),
graceful_timeout: float | None = 15.0,
routers: Sequence[Registrator[stompman.MessageFrame]] = (),
# Logging args
logger: LoggerProto | None = EMPTY,
log_level: int = logging.INFO,
# FastDepends args
apply_types: bool = True,
validate: bool = True,
_get_dependant: Callable[..., Any] | None = None,
_call_decorators: Iterable[Decorator] = (),
# AsyncAPI kwargs,
# AsyncAPI args
description: str | None = None,
tags: Iterable[Tag | TagDict] | None = None,
tags: Iterable[Tag | TagDict] = (),
) -> None:
super().__init__(
client=client, # **connection_kwargs
decoder=decoder,
parser=parser,
dependencies=dependencies,
middlewares=middlewares,
broker_config = BrokerConfigWithStompClient(
broker_middlewares=middlewares, # type: ignore[arg-type]
broker_parser=parser,
broker_decoder=decoder,
logger=make_logger_state(
logger=logger,
log_level=log_level,
default_storage_cls=StompParamsStorage, # type: ignore[type-abstract]
),
fd_config=FastDependsConfig(use_fastdepends=apply_types),
broker_dependencies=dependencies,
graceful_timeout=graceful_timeout,
logger=logger,
log_level=log_level,
apply_types=apply_types,
validate=validate,
_get_dependant=_get_dependant,
_call_decorators=_call_decorators,
extra_context={"broker": self},
producer=StompProducer(client),
client=client,
)
specification = BrokerSpec(
url=[f"{one_server.host}:{one_server.port}" for one_server in broker_config.client.servers],
protocol="STOMP",
protocol_version="1.2",
description=description,
tags=tags,
asyncapi_url=[f"{one_server.host}:{one_server.port}" for one_server in client.servers],
security=StompSecurity(),
default_logger=get_broker_logger(
name="stomp", default_context={"channel": ""}, message_id_ln=self.__max_msg_id_ln
),
)
self._attempted_to_connect = False

async def start(self) -> None:
await super().start()

for handler in self._subscribers.values():
self._log(f"`{handler.call_name}` waiting for messages", extra=handler.get_log_context(None))
await handler.start()
super().__init__(config=broker_config, specification=specification, routers=routers)
self._attempted_to_connect = False

async def _connect(self, client: stompman.Client) -> stompman.Client: # type: ignore[override]
async def _connect(self) -> stompman.Client:
if self._attempted_to_connect:
return client
return self.config.broker_config.client
self._attempted_to_connect = True
self._producer = StompProducer(client)
await client.__aenter__()
client._listen_task.add_done_callback(_handle_listen_task_done) # noqa: SLF001
return client
await self.config.broker_config.client.__aenter__()
self.config.broker_config.client._listen_task.add_done_callback(_handle_listen_task_done)
return self.config.broker_config.client

async def start(self) -> None:
await self.connect()
await super().start()

async def _close(
async def stop(
self,
exc_type: type[BaseException] | None = None,
exc_val: BaseException | None = None,
exc_tb: types.TracebackType | None = None,
) -> None:
for sub in self.subscribers:
await sub.stop()
if self._connection:
await self._connection.__aexit__(exc_type, exc_val, exc_tb)
return await super()._close(exc_type, exc_val, exc_tb)
self.running = False

async def ping(self, timeout: float | None = None) -> bool:
sleep_time = (timeout or 10) / 10
Expand All @@ -138,42 +175,52 @@ async def ping(self, timeout: float | None = None) -> bool:

return False # pragma: no cover

def get_fmt(self) -> str:
# `StompLogContext`
return (
"%(asctime)s %(levelname)-8s - "
f"%(destination)-{self._max_channel_name}s | "
f"%(message_id)-{self.__max_msg_id_ln}s "
"- %(message)s"
)

def _setup_log_context(self, **log_context: Unpack[StompLogContext]) -> None: ... # type: ignore[override]

@property
def _subscriber_setup_extra(self) -> "AnyDict":
return {**super()._subscriber_setup_extra, "client": self._connection}

async def publish( # type: ignore[override]
async def publish(
self,
message: SendableMessage,
destination: str,
*,
correlation_id: str | None = None,
headers: dict[str, str] | None = None,
) -> None:
await super().publish(
publish_command = StompPublishCommand(
message,
producer=self._producer,
correlation_id=correlation_id,
_publish_type=PublishType.PUBLISH,
destination=destination,
correlation_id=correlation_id,
headers=headers,
)
return typing.cast("None", await self._basic_publish(publish_command, producer=self.config.producer))

async def request( # type: ignore[override]
self,
msg: Any, # noqa: ANN401
message: SendableMessage,
destination: str,
*,
correlation_id: str | None = None,
headers: dict[str, str] | None = None,
) -> Any: # noqa: ANN401
return await super().request(msg, producer=self._producer, correlation_id=correlation_id, headers=headers)
publish_command = StompPublishCommand(
message,
_publish_type=PublishType.REQUEST,
destination=destination,
correlation_id=correlation_id,
headers=headers,
)
return await self._basic_request(publish_command, producer=self.config.producer)

async def publish_batch( # type: ignore[override]
self,
*_messages: SendableMessage,
destination: str,
correlation_id: str | None = None,
headers: dict[str, str] | None = None,
) -> None:
publish_command = StompPublishCommand(
"",
_publish_type=PublishType.PUBLISH,
destination=destination,
correlation_id=correlation_id,
headers=headers,
)
return typing.cast("None", await self._basic_publish_batch(publish_command, producer=self.config.producer))
32 changes: 0 additions & 32 deletions packages/faststream-stomp/faststream_stomp/message.py

This file was deleted.

Loading