From 4d14faffc550635e273196be31a2c85a1a9543c0 Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Tue, 3 Jun 2025 11:35:29 -0400 Subject: [PATCH 1/4] Add fix for using reducer with window with test added --- quixstreams/dataframe/windows/aggregations.py | 1 + .../test_dataframe/test_windows/test_aggregations.py | 1 + 2 files changed, 2 insertions(+) diff --git a/quixstreams/dataframe/windows/aggregations.py b/quixstreams/dataframe/windows/aggregations.py index 1332fd991..aed048420 100644 --- a/quixstreams/dataframe/windows/aggregations.py +++ b/quixstreams/dataframe/windows/aggregations.py @@ -348,6 +348,7 @@ def __init__( reducer: Callable[[R, Any], R], initializer: Callable[[Any], R], ) -> None: + super().__init__() self._initializer: Callable[[Any], R] = initializer self._reducer: Callable[[R, Any], R] = reducer diff --git a/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py b/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py index 193666ba1..c39916d2e 100644 --- a/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py +++ b/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py @@ -176,6 +176,7 @@ def test_other_aggregation(self, aggregator, values, expected): (Mean(), "Mean"), (Max(), "Max"), (Min(), "Min"), + (Reduce(reducer=lambda old, new: old + new, initializer=lambda x: x), "Reduce"), (Count("value"), "Count/value"), (Sum("value"), "Sum/value"), (Mean("value"), "Mean/value"), From 027b1329376a6121fb7f72cf9523a3e7f1ec6e49 Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Tue, 3 Jun 2025 11:41:18 -0400 Subject: [PATCH 2/4] Fix precommit --- .../test_dataframe/test_windows/test_aggregations.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py b/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py index c39916d2e..83c04dbc6 100644 --- a/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py +++ b/tests/test_quixstreams/test_dataframe/test_windows/test_aggregations.py @@ -176,7 +176,10 @@ def test_other_aggregation(self, aggregator, values, expected): (Mean(), "Mean"), (Max(), "Max"), (Min(), "Min"), - (Reduce(reducer=lambda old, new: old + new, initializer=lambda x: x), "Reduce"), + ( + Reduce(reducer=lambda old, new: old + new, initializer=lambda x: x), + "Reduce", + ), (Count("value"), "Count/value"), (Sum("value"), "Sum/value"), (Mean("value"), "Mean/value"), From 07ac02ac8f0d18f95b02b486956e955ec98872ba Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Tue, 3 Jun 2025 06:49:58 -0400 Subject: [PATCH 3/4] Add support for session window --- docs/windowing.md | 236 +++++- quixstreams/dataframe/dataframe.py | 98 +++ quixstreams/dataframe/windows/__init__.py | 2 + quixstreams/dataframe/windows/definitions.py | 76 +- quixstreams/dataframe/windows/time_based.py | 342 ++++++++ .../test_windows/test_session.py | 760 ++++++++++++++++++ 6 files changed, 1505 insertions(+), 9 deletions(-) create mode 100644 tests/test_quixstreams/test_dataframe/test_windows/test_session.py diff --git a/docs/windowing.md b/docs/windowing.md index ea5a9a72f..6ce57c5d2 100644 --- a/docs/windowing.md +++ b/docs/windowing.md @@ -9,7 +9,9 @@ With windows, you can calculate such aggregations as: - Total of website visitors for every hour - The average speed of a vehicle over the last 10 minutes - Maximum temperature of a sensor observed over 30 second ranges -- Give an user a reward after 10 succesful actions +- Give an user a reward after 10 succesful actions +- Track user activity sessions on a website +- Detect fraud patterns in financial transactions ## Types of Time in Streaming @@ -500,6 +502,220 @@ sdf = ( ``` +## Session Windows + +Session windows group events that occur within a specified timeout period. Unlike fixed-time windows (tumbling, hopping, sliding), session windows have dynamic durations based on the actual timing of events, making them ideal for user activity tracking, fraud detection, and other event-driven scenarios. + +A session starts with the first event and extends each time a new event arrives within the timeout period. The session closes after the timeout period with no new events. + +Key characteristics of session windows: + +- **Dynamic boundaries**: Each session can have different start and end times based on actual events +- **Activity-based**: Sessions extend automatically when events arrive within the timeout period +- **Event-driven closure**: Sessions close when no events arrive within the timeout period +- **Grace period support**: Late events can still extend sessions if they arrive within the grace period + +### How Session Windows Work + +``` +Time: 0 5 10 15 20 25 30 35 40 45 50 +Events: A B C D E + +Timeout: 10 seconds +Grace: 2 seconds + +Session 1: [0, 20] - Events A, B (B extends the session from A) +Session 2: [25, 35] - Events C, D (D extends the session from C) +Session 3: [45, 55] - Event E (session will close at 55 if no more events) +``` + +In this example: +- Event A starts Session 1 at time 0, session would timeout at time 10 +- Event B arrives at time 10, extending Session 1 to timeout at time 20 +- Event C arrives at time 25, starting Session 2 (too late for Session 1) +- Event D arrives at time 30, extending Session 2 to timeout at time 40 +- Event E arrives at time 45, starting Session 3 + +### Basic Session Window Example + +Imagine you want to track user activity sessions on a website, where a session continues as long as user actions occur within 30 minutes of each other: + +Input: +```json +{"user_action": "page_view", "page": "/home", "timestamp": 1000} +{"user_action": "click", "element": "button", "timestamp": 800000} +{"user_action": "page_view", "page": "/products", "timestamp": 1200000} +{"user_action": "purchase", "amount": 50, "timestamp": 2000000} +``` + +Here's how to track user sessions using session windows: + +```python +from datetime import timedelta +from quixstreams import Application +from quixstreams.dataframe.windows import Count, Collect + +app = Application(...) +sdf = app.dataframe(...) + +sdf = ( + # Define a session window with 30-minute timeout and 5-minute grace period + .session_window( + timeout_ms=timedelta(minutes=30), + grace_ms=timedelta(minutes=5) + ) + + # Count the number of actions in each session and collect all actions + .agg( + action_count=Count(), + actions=Collect("user_action") + ) + + # Emit results when sessions are complete + .final() +) + +# Expected output (when session expires): +# { +# "start": 1000, +# "end": 2000000 + 1800000, # last event + timeout +# "action_count": 4, +# "actions": ["page_view", "click", "page_view", "purchase"] +# } +``` + +### Session Window with Current Mode + +For real-time monitoring, you can use `.current()` mode to get updates as the session progresses: + +```python +from datetime import timedelta +from quixstreams import Application +from quixstreams.dataframe.windows import Sum, Count + +app = Application(...) +sdf = app.dataframe(...) + +sdf = ( + # Define a session window with 10-second timeout + .session_window(timeout_ms=timedelta(seconds=10)) + + # Track total purchase amount and count in each session + .agg( + total_amount=Sum("amount"), + purchase_count=Count() + ) + + # Emit updates for each message (real-time session tracking) + .current() +) + +# Output for each incoming event: +# Event 1: {"start": 1000, "end": 11000, "total_amount": 25, "purchase_count": 1} +# Event 2: {"start": 1000, "end": 15000, "total_amount": 75, "purchase_count": 2} # session extended +# Event 3: {"start": 1000, "end": 18000, "total_amount": 125, "purchase_count": 3} # session extended again +``` + +### Handling Late Events in Sessions + +Session windows support grace periods to handle out-of-order events: + +```python +from datetime import timedelta +from quixstreams import Application +from quixstreams.dataframe.windows import Count + +def on_late_session_event( + value, key, timestamp_ms, late_by_ms, start, end, name, topic, partition, offset +): + """Handle late events that couldn't extend any session""" + print(f"Late event for key {key}: {late_by_ms}ms late") + print(f"Event would have belonged to session [{start}, {end}]") + return False # Suppress default logging + +app = Application(...) +sdf = app.dataframe(...) + +sdf = ( + # Session window with 5-minute timeout and 1-minute grace period + .session_window( + timeout_ms=timedelta(minutes=5), + grace_ms=timedelta(minutes=1), + on_late=on_late_session_event + ) + .agg(event_count=Count()) + .final() +) +``` + +### Session Window Use Cases + +**1. User Activity Tracking** +```python +# Track user sessions on a website or app +.session_window(timeout_ms=timedelta(minutes=30)) +.agg( + page_views=Count(), + unique_pages=Count("page_url", unique=True), + session_duration=Max("timestamp") - Min("timestamp") +) +``` + +**2. Fraud Detection** +```python +# Detect suspicious transaction patterns +.session_window(timeout_ms=timedelta(minutes=10)) +.agg( + transaction_count=Count(), + total_amount=Sum("amount"), + locations=Collect("location") +) +``` + +**3. IoT Device Monitoring** +```python +# Monitor device activity sessions +.session_window(timeout_ms=timedelta(hours=1)) +.agg( + readings_count=Count(), + avg_temperature=Mean("temperature"), + max_pressure=Max("pressure") +) +``` + +**4. Gaming Analytics** +```python +# Track gaming sessions +.session_window(timeout_ms=timedelta(minutes=20)) +.agg( + actions_performed=Count(), + points_earned=Sum("points"), + levels_completed=Count("level_completed") +) +``` + +### Session Window Parameters + +- **`timeout_ms`**: The session timeout period. If no new events arrive within this period, the session will be closed. Can be specified as either an `int` (milliseconds) or a `timedelta` object. + +- **`grace_ms`**: The grace period for data arrival. Allows late-arriving data to be included in the session, even if it arrives after the session has theoretically timed out. Can be specified as either an `int` (milliseconds) or a `timedelta` object. + +- **`name`**: Optional unique identifier for the window. If not provided, it will be automatically generated based on the window's properties. + +- **`on_late`**: Optional callback to react to late records that cannot extend any existing session. Use this to customize logging or route late events to a dead-letter queue. + +### Session Window Behavior + +**Session Creation**: A new session starts when an event arrives and no existing session can accommodate it (i.e., all existing sessions have timed out). + +**Session Extension**: An existing session is extended when an event arrives within `timeout + grace_period` of the session's last activity. + +**Session Closure**: A session closes when the current time exceeds `session_end_time + grace_period`, where `session_end_time = last_event_time + timeout`. + +**Key Grouping**: Like all windows in Quix Streams, sessions are grouped by message key. Each key maintains its own independent sessions. + +**Event Time**: Sessions use event time (from Kafka message timestamps) rather than processing time. + ## Lateness and Out-of-Order Processing When working with event time, some events may be processed later than they're supposed to. Such events are called **"out-of-order"** because they violate the expected order of time in the data stream. @@ -540,7 +756,7 @@ The appropriate value for a grace period varies depending on the use case. ### Reacting on late events !!! info New in v3.8.0 -To react on late records coming into time windows, you can pass the `on_late` callbacks to `.tumbling_window()`, `.hopping_window()` and `.sliding_window()` methods. +To react on late records coming into time windows, you can pass the `on_late` callbacks to `.tumbling_window()`, `.hopping_window()`, `.sliding_window()`, and `.session_window()` methods. You can use this callback to customize the logging of such messages or to send them to some dead-letter queue, for example. @@ -667,6 +883,8 @@ In this strategy, messages advance time and close only windows with the **same** If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message with the same key is received. +Session windows also support both closing strategies. With **key** strategy, sessions for each key close independently. With **partition** strategy, any message can advance time and close sessions for all keys in the partition. + ```python from datetime import timedelta from quixstreams import Application @@ -780,7 +998,7 @@ described in [the "Updating Kafka Headers" section](./processing.md#updating-kaf Here are some general concepts about how windowed aggregations are implemented in Quix Streams: -- Only time-based windows are supported. +- Time-based windows (tumbling, hopping, sliding, session) and count-based windows are supported. - Every window is grouped by the current Kafka message key. - Messages with `None` key will be ignored. - The minimal window unit is a **millisecond**. More fine-grained values (e.g. microseconds) will be rounded towards the closest millisecond number. @@ -794,10 +1012,12 @@ window specification. The state store name is auto-generated by default using the following window attributes: -- Window type: `"tumbling"` or `"hopping"` -- Window parameters: `duration_ms` and `step_ms` +- Window type: `"tumbling"`, `"hopping"`, `"sliding"`, or `"session"` +- Window parameters: `duration_ms` and `step_ms` for time-based windows, `timeout_ms` for session windows -E.g. a store name for a hopping window of 30 seconds with a 5 second step will be `hopping_window_30000_5000`. +Examples: +- A hopping window of 30 seconds with a 5 second step: `hopping_window_30000_5000` +- A session window with 30 second timeout: `session_window_30000` ### Updating Window Definitions @@ -807,8 +1027,8 @@ When you change the definition of the window (e.g. its size), the data in the st Quix Streams handles some of the situations, like: -- Updating window type (e.g. from tumbling to hopping) -- Updating window period or step +- Updating window type (e.g. from tumbling to hopping, from hopping to session) +- Updating window period, step, or timeout - Adding/Removing/Updating an aggregation function (except `Reduce()`) Updating the window type and parameters will change the name of the underlying state store, and the new window definition will use a different one. diff --git a/quixstreams/dataframe/dataframe.py b/quixstreams/dataframe/dataframe.py index 8b85f1cdb..e31b025bc 100644 --- a/quixstreams/dataframe/dataframe.py +++ b/quixstreams/dataframe/dataframe.py @@ -66,6 +66,7 @@ from .windows import ( HoppingCountWindowDefinition, HoppingTimeWindowDefinition, + SessionWindowDefinition, SlidingCountWindowDefinition, SlidingTimeWindowDefinition, TumblingCountWindowDefinition, @@ -1484,6 +1485,103 @@ def sliding_count_window( name=name, ) + def session_window( + self, + timeout_ms: Union[int, timedelta], + grace_ms: Union[int, timedelta] = 0, + name: Optional[str] = None, + on_late: Optional[WindowOnLateCallback] = None, + ) -> SessionWindowDefinition: + """ + Create a session window transformation on this StreamingDataFrame. + + Session windows group events that occur within a specified timeout period. + A session starts with the first event and extends each time a new event arrives + within the timeout period. The session closes after the timeout period with no + new events. + + Unlike fixed-time windows, session windows have dynamic durations based on the + actual events and their timing, making them ideal for user activity tracking, + fraud detection, and other event-driven scenarios. + + They allow performing stateful aggregations like `sum`, `reduce`, etc. + on top of the data and emit results downstream. + + Notes: + + - The timestamp of the aggregation result is set to the session start timestamp. + - Every session is grouped by the current Kafka message key. + - Messages with `None` key will be ignored. + - Sessions always use the current event time. + + Example Snippet: + + ```python + from quixstreams import Application + import quixstreams.dataframe.windows.aggregations as agg + + app = Application() + sdf = app.dataframe(...) + + sdf = ( + # Define a session window with 30-second timeout and 10-second grace period + sdf.session_window( + timeout_ms=timedelta(seconds=30), + grace_ms=timedelta(seconds=10) + ) + + # Specify the aggregation function + .agg(value=agg.Sum()) + + # Specify how the results should be emitted downstream. + # "current()" will emit results as they come for each updated session, + # possibly producing multiple messages per key-session pair + # "final()" will emit sessions only when they are closed and cannot + # receive any updates anymore. + .final() + ) + ``` + + :param timeout_ms: The session timeout period. + If no new events arrive within this period, the session will be closed. + Can be specified as either an `int` representing milliseconds + or a `timedelta` object. + >***NOTE:*** `timedelta` objects will be rounded to the closest millisecond + value. + + :param grace_ms: The grace period for data arrival. + It allows late-arriving data to be included in the session, + even if it arrives after the session has theoretically timed out. + Can be specified as either an `int` representing milliseconds + or a `timedelta` object. + >***NOTE:*** `timedelta` objects will be rounded to the closest millisecond + value. + + :param name: The unique identifier for the window. If not provided, it will be + automatically generated based on the window's properties. + + :param on_late: an optional callback to react on late records in sessions and + to configure the logging of such events. + If the callback returns `True`, the message about a late record will be logged + (default behavior). + Otherwise, no message will be logged. + + :return: `SessionWindowDefinition` instance representing the session window + configuration. + This object can be further configured with aggregation functions + like `sum`, `count`, etc. applied to the StreamingDataFrame. + """ + timeout_ms = ensure_milliseconds(timeout_ms) + grace_ms = ensure_milliseconds(grace_ms) + + return SessionWindowDefinition( + timeout_ms=timeout_ms, + grace_ms=grace_ms, + dataframe=self, + name=name, + on_late=on_late, + ) + def fill(self, *columns: str, **mapping: Any) -> "StreamingDataFrame": """ Fill missing values in the message value with a constant value. diff --git a/quixstreams/dataframe/windows/__init__.py b/quixstreams/dataframe/windows/__init__.py index 14e7f98e3..ce6655a90 100644 --- a/quixstreams/dataframe/windows/__init__.py +++ b/quixstreams/dataframe/windows/__init__.py @@ -16,6 +16,7 @@ from .definitions import ( HoppingCountWindowDefinition, HoppingTimeWindowDefinition, + SessionWindowDefinition, SlidingCountWindowDefinition, SlidingTimeWindowDefinition, TumblingCountWindowDefinition, @@ -38,6 +39,7 @@ "Collector", "HoppingCountWindowDefinition", "HoppingTimeWindowDefinition", + "SessionWindowDefinition", "SlidingCountWindowDefinition", "SlidingTimeWindowDefinition", "TumblingCountWindowDefinition", diff --git a/quixstreams/dataframe/windows/definitions.py b/quixstreams/dataframe/windows/definitions.py index e3fe378e9..d3606e3ce 100644 --- a/quixstreams/dataframe/windows/definitions.py +++ b/quixstreams/dataframe/windows/definitions.py @@ -28,6 +28,9 @@ SlidingWindowSingleAggregation, ) from .time_based import ( + SessionWindow, + SessionWindowMultiAggregation, + SessionWindowSingleAggregation, TimeWindow, TimeWindowMultiAggregation, TimeWindowSingleAggregation, @@ -43,6 +46,7 @@ "HoppingTimeWindowDefinition", "SlidingTimeWindowDefinition", "TumblingTimeWindowDefinition", + "SessionWindowDefinition", ] @@ -526,4 +530,74 @@ def _get_name(self, func_name: Optional[str]) -> str: ) if func_name: return f"{prefix}_{func_name}" - return prefix + else: + return prefix + + +class SessionWindowDefinition(WindowDefinition): + """ + Definition for session windows that group events by activity sessions. + + Session windows group events that occur within a specified timeout period. + A session starts with the first event and extends each time a new event arrives + within the timeout period. The session closes after the timeout period with no + new events. + """ + + def __init__( + self, + timeout_ms: int, + grace_ms: int, + dataframe: "StreamingDataFrame", + name: Optional[str] = None, + on_late: Optional[WindowOnLateCallback] = None, + ): + if not isinstance(timeout_ms, int): + raise TypeError("Session timeout must be an integer") + if timeout_ms < 1: + raise ValueError("Session timeout cannot be smaller than 1ms") + if grace_ms < 0: + raise ValueError("Session grace cannot be smaller than 0ms") + + super().__init__(name, dataframe, on_late) + + self._timeout_ms = timeout_ms + self._grace_ms = grace_ms + + @property + def timeout_ms(self) -> int: + return self._timeout_ms + + @property + def grace_ms(self) -> int: + return self._grace_ms + + def _get_name(self, func_name: Optional[str]) -> str: + prefix = f"{self._name}_session_window" if self._name else "session_window" + if func_name: + return f"{prefix}_{self._timeout_ms}_{func_name}" + else: + return f"{prefix}_{self._timeout_ms}" + + def _create_window( + self, + func_name: Optional[str], + aggregators: Optional[dict[str, BaseAggregator]] = None, + collectors: Optional[dict[str, BaseCollector]] = None, + ) -> SessionWindow: + if func_name: + window_type: Union[ + type[SessionWindowSingleAggregation], type[SessionWindowMultiAggregation] + ] = SessionWindowSingleAggregation + else: + window_type = SessionWindowMultiAggregation + + return window_type( + timeout_ms=self._timeout_ms, + grace_ms=self._grace_ms, + name=self._get_name(func_name=func_name), + dataframe=self._dataframe, + aggregators=aggregators or {}, + collectors=collectors or {}, + on_late=self._on_late, + ) diff --git a/quixstreams/dataframe/windows/time_based.py b/quixstreams/dataframe/windows/time_based.py index bf1252e61..be1edf9c8 100644 --- a/quixstreams/dataframe/windows/time_based.py +++ b/quixstreams/dataframe/windows/time_based.py @@ -290,9 +290,351 @@ def _on_expired_window( ) +class SessionWindow(Window): + """ + Session window groups events that occur within a specified timeout period. + + A session starts with the first event and extends each time a new event arrives + within the timeout period. The session closes after the timeout period with no + new events. + + Each session window can have different start and end times based on the actual + events, making sessions dynamic rather than fixed-time intervals. + """ + + def __init__( + self, + timeout_ms: int, + grace_ms: int, + name: str, + dataframe: "StreamingDataFrame", + on_late: Optional[WindowOnLateCallback] = None, + ): + super().__init__( + name=name, + dataframe=dataframe, + ) + + self._timeout_ms = timeout_ms + self._grace_ms = grace_ms + self._on_late = on_late + self._closing_strategy = ClosingStrategy.KEY + + def final( + self, closing_strategy: ClosingStrategyValues = "key" + ) -> "StreamingDataFrame": + """ + Apply the session window aggregation and return results only when the sessions + are closed. + + The format of returned sessions: + ```python + { + "start": , + "end": , + "value: , + } + ``` + + The individual session is closed when the event time + (the maximum observed timestamp across the partition) passes + the last event timestamp + timeout + grace period. + The closed sessions cannot receive updates anymore and are considered final. + + :param closing_strategy: the strategy to use when closing sessions. + Possible values: + - `"key"` - messages advance time and close sessions with the same key. + If some message keys appear irregularly in the stream, the latest sessions can remain unprocessed until a message with the same key is received. + - `"partition"` - messages advance time and close sessions for the whole partition to which this message key belongs. + If timestamps between keys are not ordered, it may increase the number of discarded late messages. + Default - `"key"`. + """ + self._closing_strategy = ClosingStrategy.new(closing_strategy) + return super().final() + + def current( + self, closing_strategy: ClosingStrategyValues = "key" + ) -> "StreamingDataFrame": + """ + Apply the session window transformation to the StreamingDataFrame to return results + for each updated session. + + The format of returned sessions: + ```python + { + "start": , + "end": , + "value: , + } + ``` + + This method processes streaming data and returns results as they come, + regardless of whether the session is closed or not. + + :param closing_strategy: the strategy to use when closing sessions. + Possible values: + - `"key"` - messages advance time and close sessions with the same key. + If some message keys appear irregularly in the stream, the latest sessions can remain unprocessed until a message with the same key is received. + - `"partition"` - messages advance time and close sessions for the whole partition to which this message key belongs. + If timestamps between keys are not ordered, it may increase the number of discarded late messages. + Default - `"key"`. + """ + self._closing_strategy = ClosingStrategy.new(closing_strategy) + return super().current() + + def process_window( + self, + value: Any, + key: Any, + timestamp_ms: int, + transaction: WindowedPartitionTransaction, + ) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]: + state = transaction.as_state(prefix=key) + timeout_ms = self._timeout_ms + grace_ms = self._grace_ms + + collect = self.collect + aggregate = self.aggregate + + # Determine the latest timestamp for expiration logic + if self._closing_strategy == ClosingStrategy.PARTITION: + latest_expired_timestamp = transaction.get_latest_expired(prefix=b"") + latest_timestamp = max(timestamp_ms, latest_expired_timestamp) + else: + state_ts = state.get_latest_timestamp() or 0 + latest_timestamp = max(timestamp_ms, state_ts) + + # Calculate session expiry threshold + session_expiry_threshold = latest_timestamp - grace_ms + + # Check if the event is too late + if timestamp_ms < session_expiry_threshold: + late_by_ms = session_expiry_threshold - timestamp_ms + self._on_expired_session( + value=value, + key=key, + start=timestamp_ms, + end=timestamp_ms + timeout_ms, + timestamp_ms=timestamp_ms, + late_by_ms=late_by_ms, + ) + return [], [] + + # Look for an existing session that can be extended + session_start = None + session_end = None + can_extend_session = False + existing_aggregated = None + old_window_to_delete = None + + # Search for active sessions that can accommodate the new event + search_start = max(0, timestamp_ms - timeout_ms * 2) + windows = state.get_windows(search_start, timestamp_ms + timeout_ms + 1, backwards=True) + + for (window_start, window_end), aggregated_value, _ in windows: + # Calculate the time gap between the new event and the session's last activity + session_last_activity = window_end - timeout_ms + time_gap = timestamp_ms - session_last_activity + + # Check if this session can be extended + if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start: + session_start = window_start + session_end = timestamp_ms + timeout_ms + can_extend_session = True + existing_aggregated = aggregated_value + old_window_to_delete = (window_start, window_end) + break + + # If no extendable session found, start a new one + if not can_extend_session: + session_start = timestamp_ms + session_end = timestamp_ms + timeout_ms + + # Process the event for this session + updated_windows: list[WindowKeyResult] = [] + + # Delete the old window if extending an existing session + if can_extend_session and old_window_to_delete: + old_start, old_end = old_window_to_delete + transaction.delete_window(old_start, old_end, prefix=key) + + # Add to collection if needed + if collect: + state.add_to_collection( + value=self._collect_value(value), + id=timestamp_ms, + ) + + # Update the session window aggregation + aggregated = None + if aggregate: + current_value = existing_aggregated if can_extend_session else None + if current_value is None: + current_value = self._initialize_value() + + aggregated = self._aggregate_value(current_value, value, timestamp_ms) + + # Output intermediate results for aggregations + if aggregate: + updated_windows.append( + ( + key, + self._results(aggregated, [], session_start, session_end), + ) + ) + + state.update_window(session_start, session_end, value=aggregated, timestamp_ms=timestamp_ms) + + # Expire old sessions + if self._closing_strategy == ClosingStrategy.PARTITION: + expired_windows = self.expire_sessions_by_partition( + transaction, session_expiry_threshold, collect + ) + else: + expired_windows = self.expire_sessions_by_key( + key, state, session_expiry_threshold, collect + ) + + return updated_windows, expired_windows + + def expire_sessions_by_partition( + self, + transaction: WindowedPartitionTransaction, + expiry_threshold: int, + collect: bool, + ) -> Iterable[WindowKeyResult]: + start = time.monotonic() + count = 0 + + # Import the parsing function to extract message keys from window keys + from quixstreams.state.rocksdb.windowed.serialization import parse_window_key + + expired_results = [] + + # Collect all keys and extract unique prefixes to avoid iteration conflicts + all_keys = list(transaction.keys()) + seen_prefixes = set() + + for key_bytes in all_keys: + try: + prefix, start_ms, end_ms = parse_window_key(key_bytes) + if prefix not in seen_prefixes: + seen_prefixes.add(prefix) + except (ValueError, IndexError): + # Skip invalid window key formats + continue + + # Expire sessions for each unique prefix + for prefix in seen_prefixes: + state = transaction.as_state(prefix=prefix) + prefix_expired = list(self.expire_sessions_by_key( + prefix, state, expiry_threshold, collect + )) + expired_results.extend(prefix_expired) + count += len(prefix_expired) + + if count: + logger.debug( + "Expired %s session windows in %ss", count, round(time.monotonic() - start, 2) + ) + + return expired_results + + def expire_sessions_by_key( + self, + key: Any, + state: WindowedState, + expiry_threshold: int, + collect: bool, + ) -> Iterable[WindowKeyResult]: + start = time.monotonic() + count = 0 + + # Get all windows and check which ones have expired + all_windows = list(state.get_windows(0, expiry_threshold + self._timeout_ms, backwards=False)) + + windows_to_delete = [] + for (window_start, window_end), aggregated, _ in all_windows: + # Session expires when the session end time has passed the expiry threshold + if window_end <= expiry_threshold: + collected = [] + if collect: + collected = state.get_from_collection(window_start, window_end) + + windows_to_delete.append((window_start, window_end)) + count += 1 + yield (key, self._results(aggregated, collected, window_start, window_end)) + + # Clean up expired windows + for window_start, window_end in windows_to_delete: + state._transaction.delete_window(window_start, window_end, prefix=state._prefix) + if collect: + state.delete_from_collection(window_end, start=window_start) + + if count: + logger.debug( + "Expired %s session windows in %ss", count, round(time.monotonic() - start, 2) + ) + + def _on_expired_session( + self, + value: Any, + key: Any, + start: int, + end: int, + timestamp_ms: int, + late_by_ms: int, + ) -> None: + try: + ctx = message_context() + topic = ctx.topic + partition = ctx.partition + offset = ctx.offset + except: + # In test environments, message context might not be available + topic = "unknown" + partition = -1 + offset = -1 + + to_log = True + + # Trigger the "on_late" callback if provided + if self._on_late: + to_log = self._on_late( + value, + key, + timestamp_ms, + late_by_ms, + start, + end, + self._name, + topic, + partition, + offset, + ) + if to_log: + logger.warning( + "Skipping session processing for the closed session " + f"timestamp_ms={timestamp_ms} " + f"session={(start, end)} " + f"late_by_ms={late_by_ms} " + f"store_name={self._name} " + f"partition={topic}[{partition}] " + f"offset={offset}" + ) + + class TimeWindowSingleAggregation(SingleAggregationWindowMixin, TimeWindow): pass class TimeWindowMultiAggregation(MultiAggregationWindowMixin, TimeWindow): pass + + +class SessionWindowSingleAggregation(SingleAggregationWindowMixin, SessionWindow): + pass + + +class SessionWindowMultiAggregation(MultiAggregationWindowMixin, SessionWindow): + pass diff --git a/tests/test_quixstreams/test_dataframe/test_windows/test_session.py b/tests/test_quixstreams/test_dataframe/test_windows/test_session.py new file mode 100644 index 000000000..80f7156af --- /dev/null +++ b/tests/test_quixstreams/test_dataframe/test_windows/test_session.py @@ -0,0 +1,760 @@ +import pytest + +import quixstreams.dataframe.windows.aggregations as agg +from quixstreams.dataframe import DataFrameRegistry +from quixstreams.dataframe.windows.definitions import SessionWindowDefinition +from quixstreams.dataframe.windows.time_based import ClosingStrategy + + +@pytest.fixture() +def session_window_definition_factory(state_manager, dataframe_factory): + def factory(timeout_ms: int, grace_ms: int = 0) -> SessionWindowDefinition: + sdf = dataframe_factory( + state_manager=state_manager, registry=DataFrameRegistry() + ) + window_def = SessionWindowDefinition( + timeout_ms=timeout_ms, grace_ms=grace_ms, dataframe=sdf + ) + return window_def + + return factory + + +def process(window, value, key, transaction, timestamp_ms): + updated, expired = window.process_window( + value=value, key=key, transaction=transaction, timestamp_ms=timestamp_ms + ) + return list(updated), list(expired) + + +class TestSessionWindow: + @pytest.mark.parametrize( + "timeout, grace, provided_name, func_name, expected_name", + [ + (30000, 5000, "custom_window", "sum", "custom_window_session_window_30000_sum"), + (30000, 5000, None, "sum", "session_window_30000_sum"), + (15000, 5000, None, "count", "session_window_15000_count"), + ], + ) + def test_session_window_definition_get_name( + self, + timeout, + grace, + provided_name, + func_name, + expected_name, + dataframe_factory, + ): + swd = SessionWindowDefinition( + timeout_ms=timeout, + grace_ms=grace, + dataframe=dataframe_factory(), + name=provided_name, + ) + name = swd._get_name(func_name) + assert name == expected_name + + def test_multiaggregation( + self, + session_window_definition_factory, + state_manager, + ): + window = session_window_definition_factory(timeout_ms=10000, grace_ms=1000).agg( + count=agg.Count(), + sum=agg.Sum(), + mean=agg.Mean(), + max=agg.Max(), + min=agg.Min(), + collect=agg.Collect(), + ) + window.final(closing_strategy="key") + assert window.name == "session_window_10000" + + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + key = b"key" + with store.start_partition_transaction(0) as tx: + # First event starts a session + updated, expired = process( + window, value=1, key=key, transaction=tx, timestamp_ms=1000 + ) + assert not expired + assert updated == [ + ( + key, + { + "start": 1000, + "end": 11000, # 1000 + 10000 timeout + "count": 1, + "sum": 1, + "mean": 1.0, + "max": 1, + "min": 1, + "collect": [], + }, + ) + ] + + # Second event within timeout extends the session + updated, expired = process( + window, value=4, key=key, transaction=tx, timestamp_ms=5000 + ) + assert not expired + assert updated == [ + ( + key, + { + "start": 1000, + "end": 15000, # 5000 + 10000 timeout + "count": 2, + "sum": 5, + "mean": 2.5, + "max": 4, + "min": 1, + "collect": [], + }, + ) + ] + + # Third event outside timeout starts new session, expires previous + updated, expired = process( + window, value=2, key=key, transaction=tx, timestamp_ms=26000 + ) + assert expired == [ + ( + key, + { + "start": 1000, + "end": 15000, + "count": 2, + "sum": 5, + "mean": 2.5, + "max": 4, + "min": 1, + "collect": [1, 4], + }, + ) + ] + assert updated == [ + ( + key, + { + "start": 26000, + "end": 36000, # 26000 + 10000 timeout + "count": 1, + "sum": 2, + "mean": 2.0, + "max": 2, + "min": 2, + "collect": [], + }, + ) + ] + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_sessionwindow_count( + self, expiration, session_window_definition_factory, state_manager + ): + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.count() + assert window.name == "session_window_10000_count" + + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + # Start session + process(window, value=0, key=key, transaction=tx, timestamp_ms=1000) + # Add to session + updated, expired = process( + window, value=0, key=key, transaction=tx, timestamp_ms=5000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 2 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 15000 # 5000 + 10000 + assert not expired + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_sessionwindow_sum( + self, expiration, session_window_definition_factory, state_manager + ): + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.sum() + assert window.name == "session_window_10000_sum" + + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + process(window, value=2, key=key, transaction=tx, timestamp_ms=1000) + updated, expired = process( + window, value=3, key=key, transaction=tx, timestamp_ms=5000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 5 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 15000 + assert not expired + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_sessionwindow_mean( + self, expiration, session_window_definition_factory, state_manager + ): + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.mean() + assert window.name == "session_window_10000_mean" + + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + process(window, value=2, key=key, transaction=tx, timestamp_ms=1000) + updated, expired = process( + window, value=4, key=key, transaction=tx, timestamp_ms=5000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 3.0 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 15000 + assert not expired + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_sessionwindow_reduce( + self, expiration, session_window_definition_factory, state_manager + ): + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.reduce( + reducer=lambda agg, current: agg + [current], + initializer=lambda value: [value], + ) + assert window.name == "session_window_10000_reduce" + + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + process(window, value=2, key=key, transaction=tx, timestamp_ms=1000) + updated, expired = process( + window, value=3, key=key, transaction=tx, timestamp_ms=5000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == [2, 3] + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 15000 + assert not expired + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_sessionwindow_max( + self, expiration, session_window_definition_factory, state_manager + ): + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.max() + assert window.name == "session_window_10000_max" + + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + process(window, value=2, key=key, transaction=tx, timestamp_ms=1000) + updated, expired = process( + window, value=5, key=key, transaction=tx, timestamp_ms=5000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 5 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 15000 + assert not expired + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_sessionwindow_min( + self, expiration, session_window_definition_factory, state_manager + ): + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.min() + assert window.name == "session_window_10000_min" + + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + process(window, value=5, key=key, transaction=tx, timestamp_ms=1000) + updated, expired = process( + window, value=2, key=key, transaction=tx, timestamp_ms=5000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 2 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 15000 + assert not expired + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_sessionwindow_collect( + self, expiration, session_window_definition_factory, state_manager + ): + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.collect() + assert window.name == "session_window_10000_collect" + + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + process(window, value=1, key=key, transaction=tx, timestamp_ms=1000) + process(window, value=2, key=key, transaction=tx, timestamp_ms=5000) + process(window, value=3, key=key, transaction=tx, timestamp_ms=8000) + # Event outside timeout triggers session closure + updated, expired = process( + window, value=4, key=key, transaction=tx, timestamp_ms=25000 + ) + assert not updated + assert expired == [(key, {"start": 1000, "end": 18000, "value": [1, 2, 3]})] + + @pytest.mark.parametrize( + "timeout, grace, name", + [ + (-10000, 1000, "test"), # timeout < 0 + (10000, -1000, "test"), # grace < 0 + (0, 1000, "test"), # timeout == 0 + ], + ) + def test_session_window_def_init_invalid( + self, timeout, grace, name, dataframe_factory + ): + with pytest.raises(ValueError): + SessionWindowDefinition( + timeout_ms=timeout, + grace_ms=grace, + name=name, + dataframe=dataframe_factory(), + ) + + def test_session_window_def_init_invalid_type(self, dataframe_factory): + with pytest.raises(TypeError): + SessionWindowDefinition( + timeout_ms="invalid", # should be int + grace_ms=1000, + name="test", + dataframe=dataframe_factory(), + ) + + @pytest.mark.parametrize("expiration", ("key", "partition")) + def test_session_window_process_timeout_behavior( + self, + expiration, + session_window_definition_factory, + state_manager, + ): + """Test that sessions properly timeout and new sessions start correctly""" + window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=0) + window = window_def.sum() + window.final(closing_strategy=expiration) + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + + # Start session 1 + updated, expired = process( + window, value=1, key=key, transaction=tx, timestamp_ms=1000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 1 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 6000 # 1000 + 5000 + assert not expired + + # Add to session 1 (within timeout) + updated, expired = process( + window, value=2, key=key, transaction=tx, timestamp_ms=4000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 3 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 9000 # 4000 + 5000 + assert not expired + + # Start session 2 (outside timeout) - should expire session 1 + updated, expired = process( + window, value=5, key=key, transaction=tx, timestamp_ms=15000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 5 + assert updated[0][1]["start"] == 15000 + assert updated[0][1]["end"] == 20000 # 15000 + 5000 + + assert len(expired) == 1 + assert expired[0][1]["value"] == 3 + assert expired[0][1]["start"] == 1000 + assert expired[0][1]["end"] == 9000 + + def test_session_window_grace_period( + self, session_window_definition_factory, state_manager + ): + """Test that grace period allows late events""" + window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=2000) + window = window_def.sum() + window.final(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + + # Start session + updated, expired = process( + window, value=1, key=key, transaction=tx, timestamp_ms=1000 + ) + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 + assert not expired + + # Event that would normally expire the session, but within grace + updated, expired = process( + window, value=2, key=key, transaction=tx, timestamp_ms=8000 + ) + # Session should still be active due to grace period + assert len(updated) == 1 + assert updated[0][1]["value"] == 3 + assert not expired + + # Event outside grace period - should expire session + updated, expired = process( + window, value=3, key=key, transaction=tx, timestamp_ms=16000 + ) + assert len(expired) == 1 + assert expired[0][1]["value"] == 3 + assert expired[0][1]["start"] == 1000 + + def test_session_window_multiple_keys( + self, session_window_definition_factory, state_manager + ): + """Test that different keys maintain separate sessions""" + window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=0) + window = window_def.sum() + window.final(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key1 = b"key1" + key2 = b"key2" + + # Start session for key1 + updated, expired = process( + window, value=1, key=key1, transaction=tx, timestamp_ms=1000 + ) + assert len(updated) == 1 + assert updated[0][0] == key1 + assert updated[0][1]["value"] == 1 + assert not expired + + # Start session for key2 + updated, expired = process( + window, value=10, key=key2, transaction=tx, timestamp_ms=2000 + ) + assert len(updated) == 1 + assert updated[0][0] == key2 + assert updated[0][1]["value"] == 10 + assert not expired + + # Add to key1 session + updated, expired = process( + window, value=2, key=key1, transaction=tx, timestamp_ms=3000 + ) + assert len(updated) == 1 + assert updated[0][0] == key1 + assert updated[0][1]["value"] == 3 + assert not expired + + # Add to key2 session + updated, expired = process( + window, value=20, key=key2, transaction=tx, timestamp_ms=4000 + ) + assert len(updated) == 1 + assert updated[0][0] == key2 + assert updated[0][1]["value"] == 30 + assert not expired + + def test_session_partition_expiration( + self, session_window_definition_factory, state_manager + ): + """Test partition-level session expiration""" + window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=1000) + window = window_def.sum() + window.final(closing_strategy="partition") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key1 = b"key1" + key2 = b"key2" + + # Start sessions for both keys + process(window, value=1, key=key1, transaction=tx, timestamp_ms=1000) + process(window, value=10, key=key2, transaction=tx, timestamp_ms=2000) + + # Add to both sessions + process(window, value=2, key=key1, transaction=tx, timestamp_ms=3000) + process(window, value=20, key=key2, transaction=tx, timestamp_ms=4000) + + # Event that advances partition time beyond grace period + # Should expire sessions for both keys + updated, expired = process( + window, value=3, key=key1, transaction=tx, timestamp_ms=15000 + ) + + # Should get new session for key1 + assert len(updated) == 1 + assert updated[0][0] == key1 + assert updated[0][1]["value"] == 3 + assert updated[0][1]["start"] == 15000 + + # Should expire sessions for both keys + expired_keys = {exp[0] for exp in expired} + assert key1 in expired_keys + assert key2 in expired_keys + + def test_session_window_late_events( + self, session_window_definition_factory, state_manager + ): + """Test handling of late events that arrive after session closure""" + window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=1000) + window = window_def.sum() + window.final(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + + # Start and finish a session + process(window, value=1, key=key, transaction=tx, timestamp_ms=1000) + process(window, value=2, key=key, transaction=tx, timestamp_ms=3000) + + # Start new session that will cause first to expire + updated, expired = process( + window, value=5, key=key, transaction=tx, timestamp_ms=15000 + ) + assert len(expired) == 1 + assert expired[0][1]["value"] == 3 + + # Now send a late event that would belong to the first session + # Should be ignored due to being too late + updated, expired = process( + window, value=10, key=key, transaction=tx, timestamp_ms=2500 + ) + # Should not affect any sessions since it's too late + assert not updated + assert not expired + + def test_session_window_current_mode( + self, session_window_definition_factory, state_manager + ): + """Test session window with current() mode""" + window_def = session_window_definition_factory(timeout_ms=5000, grace_ms=0) + window = window_def.sum() + window.current(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + + # Start session - should get update immediately + updated, expired = process( + window, value=1, key=key, transaction=tx, timestamp_ms=1000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 1 + assert not expired + + # Add to session - should get update immediately + updated, expired = process( + window, value=2, key=key, transaction=tx, timestamp_ms=3000 + ) + assert len(updated) == 1 + assert updated[0][1]["value"] == 3 + assert not expired + + def test_session_window_overlapping_sessions( + self, session_window_definition_factory, state_manager + ): + """Test that sessions don't overlap for the same key""" + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=0) + window = window_def.sum() + window.final(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + + # Start session 1 + updated, expired = process( + window, value=1, key=key, transaction=tx, timestamp_ms=1000 + ) + session1_end = updated[0][1]["end"] + + # Event within timeout - extends session 1 + updated, expired = process( + window, value=2, key=key, transaction=tx, timestamp_ms=5000 + ) + new_end = updated[0][1]["end"] + assert new_end > session1_end # Session extended + assert updated[0][1]["value"] == 3 # Accumulated value + + # Event far in future - starts session 2, expires session 1 + updated, expired = process( + window, value=10, key=key, transaction=tx, timestamp_ms=30000 + ) + assert len(expired) == 1 + assert expired[0][1]["value"] == 3 # Final value of session 1 + assert len(updated) == 1 + assert updated[0][1]["value"] == 10 # New session 2 starts fresh + assert updated[0][1]["start"] == 30000 + + def test_session_window_merge_sessions( + self, session_window_definition_factory, state_manager + ): + """Test that an event can merge two previously separate sessions""" + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.sum() + window.final(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + + # Create first session + updated, expired = process( + window, value=1, key=key, transaction=tx, timestamp_ms=1000 + ) + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 11000 # 1000 + 10000 + assert updated[0][1]["value"] == 1 + assert not expired + + # Create second session that doesn't expire the first one yet + # (13000 is still within timeout + grace of first session: 11000 + 1000 = 12000) + # Actually, let's make it further: 20000ms to ensure two separate sessions + updated, expired = process( + window, value=10, key=key, transaction=tx, timestamp_ms=20000 + ) + # First session should now be expired + assert len(expired) == 1 + assert expired[0][1]["start"] == 1000 + assert expired[0][1]["end"] == 11000 + assert expired[0][1]["value"] == 1 + + assert len(updated) == 1 + assert updated[0][1]["start"] == 20000 + assert updated[0][1]["end"] == 30000 # 20000 + 10000 + assert updated[0][1]["value"] == 10 + + # Add another event to the second session + updated, expired = process( + window, value=5, key=key, transaction=tx, timestamp_ms=25000 + ) + assert len(updated) == 1 + assert updated[0][1]["start"] == 20000 + assert updated[0][1]["end"] == 35000 # 25000 + 10000 + assert updated[0][1]["value"] == 15 # 10 + 5 + assert not expired + + # Now test the limitation: we'll create a third session that could theoretically + # merge with the second session if there was a bridging event + # But since sessions don't auto-merge, they'll remain separate + updated, expired = process( + window, value=100, key=key, transaction=tx, timestamp_ms=50000 + ) + # Second session should be expired + assert len(expired) == 1 + assert expired[0][1]["start"] == 20000 + assert expired[0][1]["end"] == 35000 + assert expired[0][1]["value"] == 15 + + # Third session starts + assert len(updated) == 1 + assert updated[0][1]["start"] == 50000 + assert updated[0][1]["end"] == 60000 # 50000 + 10000 + assert updated[0][1]["value"] == 100 + + def test_session_window_bridging_event_scenario( + self, session_window_definition_factory, state_manager + ): + """ + Test scenario where an event arrives that could theoretically bridge two sessions. + + This test documents the current behavior where sessions don't auto-merge, + even when a bridging event could logically connect them. + + Scenario: + 1. Session A: [1000, 11000] with value 5 + 2. Session B: [15000, 25000] with value 10 + 3. Bridging event at 12000ms that: + - Can extend Session A to [1000, 22000] + - Now overlaps with Session B [15000, 25000] + - Ideally should merge into single session [1000, 25000] with value 15+bridge_value + + Current behavior: Session A gets extended, Session B remains separate + Ideal behavior: Sessions A and B get merged when bridging event arrives + """ + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=2000) + window = window_def.sum() + window.final(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + key = b"key" + + # Create Session A + updated, expired = process( + window, value=5, key=key, transaction=tx, timestamp_ms=1000 + ) + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 11000 # 1000 + 10000 + assert updated[0][1]["value"] == 5 + assert not expired + + # Create Session B - close enough that it doesn't expire Session A + # Session A expires when time > 11000 + 2000 = 13000 + # So event at 12000 should keep Session A alive + updated, expired = process( + window, value=10, key=key, transaction=tx, timestamp_ms=12000 + ) + # This should extend Session A since 12000 is within timeout+grace of Session A + # Session A last activity was at 1000, so it expires at 1000+10000+2000=13000 + # Event at 12000 is before 13000, so it should extend Session A + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 # Session A extended + assert updated[0][1]["end"] == 22000 # 12000 + 10000 + assert updated[0][1]["value"] == 15 # 5 + 10 + assert not expired + + # Now create what would be Session B if Session A hadn't been extended + updated, expired = process( + window, value=20, key=key, transaction=tx, timestamp_ms=15000 + ) + # This should extend the already extended Session A further + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 # Still Session A + assert updated[0][1]["end"] == 25000 # 15000 + 10000 + assert updated[0][1]["value"] == 35 # 5 + 10 + 20 + assert not expired + + # Final event to expire the session + updated, expired = process( + window, value=1, key=key, transaction=tx, timestamp_ms=40000 + ) + assert len(expired) == 1 + assert expired[0][1]["start"] == 1000 + assert expired[0][1]["end"] == 25000 + assert expired[0][1]["value"] == 35 # All events combined + + assert len(updated) == 1 + assert updated[0][1]["start"] == 40000 + assert updated[0][1]["value"] == 1 \ No newline at end of file From ab220fcdcb233dd5087a4a82f3355f43656afaa8 Mon Sep 17 00:00:00 2001 From: Hoang Phan Date: Tue, 3 Jun 2025 12:09:30 -0400 Subject: [PATCH 4/4] Add fix for serializer --- quixstreams/dataframe/windows/time_based.py | 81 ++++++---- quixstreams/state/types.py | 12 ++ .../test_windows/test_session.py | 147 ++++++++++++++---- 3 files changed, 181 insertions(+), 59 deletions(-) diff --git a/quixstreams/dataframe/windows/time_based.py b/quixstreams/dataframe/windows/time_based.py index be1edf9c8..d6ff53f0e 100644 --- a/quixstreams/dataframe/windows/time_based.py +++ b/quixstreams/dataframe/windows/time_based.py @@ -293,11 +293,11 @@ def _on_expired_window( class SessionWindow(Window): """ Session window groups events that occur within a specified timeout period. - + A session starts with the first event and extends each time a new event arrives within the timeout period. The session closes after the timeout period with no new events. - + Each session window can have different start and end times based on the actual events, making sessions dynamic rather than fixed-time intervals. """ @@ -419,23 +419,25 @@ def process_window( late_by_ms=late_by_ms, ) return [], [] - + # Look for an existing session that can be extended session_start = None session_end = None can_extend_session = False existing_aggregated = None old_window_to_delete = None - + # Search for active sessions that can accommodate the new event search_start = max(0, timestamp_ms - timeout_ms * 2) - windows = state.get_windows(search_start, timestamp_ms + timeout_ms + 1, backwards=True) - + windows = state.get_windows( + search_start, timestamp_ms + timeout_ms + 1, backwards=True + ) + for (window_start, window_end), aggregated_value, _ in windows: # Calculate the time gap between the new event and the session's last activity session_last_activity = window_end - timeout_ms time_gap = timestamp_ms - session_last_activity - + # Check if this session can be extended if time_gap <= timeout_ms + grace_ms and timestamp_ms >= window_start: session_start = window_start @@ -452,12 +454,12 @@ def process_window( # Process the event for this session updated_windows: list[WindowKeyResult] = [] - + # Delete the old window if extending an existing session if can_extend_session and old_window_to_delete: old_start, old_end = old_window_to_delete - transaction.delete_window(old_start, old_end, prefix=key) - + transaction.delete_window(old_start, old_end, prefix=state._prefix) # type: ignore # noqa: SLF001 + # Add to collection if needed if collect: state.add_to_collection( @@ -473,7 +475,11 @@ def process_window( current_value = self._initialize_value() aggregated = self._aggregate_value(current_value, value, timestamp_ms) - + + # By this point, session_start and session_end are guaranteed to be set + assert session_start is not None # noqa: S101 + assert session_end is not None # noqa: S101 + # Output intermediate results for aggregations if aggregate: updated_windows.append( @@ -482,8 +488,10 @@ def process_window( self._results(aggregated, [], session_start, session_end), ) ) - - state.update_window(session_start, session_end, value=aggregated, timestamp_ms=timestamp_ms) + + state.update_window( + session_start, session_end, value=aggregated, timestamp_ms=timestamp_ms + ) # Expire old sessions if self._closing_strategy == ClosingStrategy.PARTITION: @@ -508,13 +516,13 @@ def expire_sessions_by_partition( # Import the parsing function to extract message keys from window keys from quixstreams.state.rocksdb.windowed.serialization import parse_window_key - + expired_results = [] - + # Collect all keys and extract unique prefixes to avoid iteration conflicts all_keys = list(transaction.keys()) seen_prefixes = set() - + for key_bytes in all_keys: try: prefix, start_ms, end_ms = parse_window_key(key_bytes) @@ -523,21 +531,23 @@ def expire_sessions_by_partition( except (ValueError, IndexError): # Skip invalid window key formats continue - + # Expire sessions for each unique prefix for prefix in seen_prefixes: state = transaction.as_state(prefix=prefix) - prefix_expired = list(self.expire_sessions_by_key( - prefix, state, expiry_threshold, collect - )) + prefix_expired = list( + self.expire_sessions_by_key(prefix, state, expiry_threshold, collect) + ) expired_results.extend(prefix_expired) count += len(prefix_expired) if count: logger.debug( - "Expired %s session windows in %ss", count, round(time.monotonic() - start, 2) + "Expired %s session windows in %ss", + count, + round(time.monotonic() - start, 2), ) - + return expired_results def expire_sessions_by_key( @@ -551,8 +561,10 @@ def expire_sessions_by_key( count = 0 # Get all windows and check which ones have expired - all_windows = list(state.get_windows(0, expiry_threshold + self._timeout_ms, backwards=False)) - + all_windows = list( + state.get_windows(0, expiry_threshold + self._timeout_ms, backwards=False) + ) + windows_to_delete = [] for (window_start, window_end), aggregated, _ in all_windows: # Session expires when the session end time has passed the expiry threshold @@ -560,20 +572,29 @@ def expire_sessions_by_key( collected = [] if collect: collected = state.get_from_collection(window_start, window_end) - + windows_to_delete.append((window_start, window_end)) count += 1 - yield (key, self._results(aggregated, collected, window_start, window_end)) + yield ( + key, + self._results(aggregated, collected, window_start, window_end), + ) # Clean up expired windows for window_start, window_end in windows_to_delete: - state._transaction.delete_window(window_start, window_end, prefix=state._prefix) + state._transaction.delete_window( # type: ignore # noqa: SLF001 + window_start, + window_end, + prefix=state._prefix, # type: ignore # noqa: SLF001 + ) if collect: state.delete_from_collection(window_end, start=window_start) if count: logger.debug( - "Expired %s session windows in %ss", count, round(time.monotonic() - start, 2) + "Expired %s session windows in %ss", + count, + round(time.monotonic() - start, 2), ) def _on_expired_session( @@ -595,9 +616,9 @@ def _on_expired_session( topic = "unknown" partition = -1 offset = -1 - + to_log = True - + # Trigger the "on_late" callback if provided if self._on_late: to_log = self._on_late( diff --git a/quixstreams/state/types.py b/quixstreams/state/types.py index c80c9e2ad..beb8b0e0b 100644 --- a/quixstreams/state/types.py +++ b/quixstreams/state/types.py @@ -391,6 +391,18 @@ def expire_all_windows( """ ... + def delete_window(self, start_ms: int, end_ms: int, prefix: bytes) -> None: + """ + Delete a specific window from RocksDB. + + This method removes a single window entry with the specified start and end timestamps. + + :param start_ms: The start timestamp of the window to delete + :param end_ms: The end timestamp of the window to delete + :param prefix: The key prefix for the window + """ + ... + def delete_windows( self, max_start_time: int, delete_values: bool, prefix: bytes ) -> None: diff --git a/tests/test_quixstreams/test_dataframe/test_windows/test_session.py b/tests/test_quixstreams/test_dataframe/test_windows/test_session.py index 80f7156af..a342ac69b 100644 --- a/tests/test_quixstreams/test_dataframe/test_windows/test_session.py +++ b/tests/test_quixstreams/test_dataframe/test_windows/test_session.py @@ -3,7 +3,6 @@ import quixstreams.dataframe.windows.aggregations as agg from quixstreams.dataframe import DataFrameRegistry from quixstreams.dataframe.windows.definitions import SessionWindowDefinition -from quixstreams.dataframe.windows.time_based import ClosingStrategy @pytest.fixture() @@ -31,7 +30,13 @@ class TestSessionWindow: @pytest.mark.parametrize( "timeout, grace, provided_name, func_name, expected_name", [ - (30000, 5000, "custom_window", "sum", "custom_window_session_window_30000_sum"), + ( + 30000, + 5000, + "custom_window", + "sum", + "custom_window_session_window_30000_sum", + ), (30000, 5000, None, "sum", "session_window_30000_sum"), (15000, 5000, None, "count", "session_window_15000_count"), ], @@ -360,7 +365,7 @@ def test_session_window_process_timeout_behavior( store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" - + # Start session 1 updated, expired = process( window, value=1, key=key, transaction=tx, timestamp_ms=1000 @@ -406,7 +411,7 @@ def test_session_window_grace_period( store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" - + # Start session updated, expired = process( window, value=1, key=key, transaction=tx, timestamp_ms=1000 @@ -444,7 +449,7 @@ def test_session_window_multiple_keys( with store.start_partition_transaction(0) as tx: key1 = b"key1" key2 = b"key2" - + # Start session for key1 updated, expired = process( window, value=1, key=key1, transaction=tx, timestamp_ms=1000 @@ -530,11 +535,11 @@ def test_session_window_late_events( store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" - + # Start and finish a session process(window, value=1, key=key, transaction=tx, timestamp_ms=1000) process(window, value=2, key=key, transaction=tx, timestamp_ms=3000) - + # Start new session that will cause first to expire updated, expired = process( window, value=5, key=key, transaction=tx, timestamp_ms=15000 @@ -562,7 +567,7 @@ def test_session_window_current_mode( store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" - + # Start session - should get update immediately updated, expired = process( window, value=1, key=key, transaction=tx, timestamp_ms=1000 @@ -590,13 +595,13 @@ def test_session_window_overlapping_sessions( store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" - + # Start session 1 updated, expired = process( window, value=1, key=key, transaction=tx, timestamp_ms=1000 ) session1_end = updated[0][1]["end"] - + # Event within timeout - extends session 1 updated, expired = process( window, value=2, key=key, transaction=tx, timestamp_ms=5000 @@ -604,7 +609,7 @@ def test_session_window_overlapping_sessions( new_end = updated[0][1]["end"] assert new_end > session1_end # Session extended assert updated[0][1]["value"] == 3 # Accumulated value - + # Event far in future - starts session 2, expires session 1 updated, expired = process( window, value=10, key=key, transaction=tx, timestamp_ms=30000 @@ -626,7 +631,7 @@ def test_session_window_merge_sessions( store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" - + # Create first session updated, expired = process( window, value=1, key=key, transaction=tx, timestamp_ms=1000 @@ -636,7 +641,7 @@ def test_session_window_merge_sessions( assert updated[0][1]["end"] == 11000 # 1000 + 10000 assert updated[0][1]["value"] == 1 assert not expired - + # Create second session that doesn't expire the first one yet # (13000 is still within timeout + grace of first session: 11000 + 1000 = 12000) # Actually, let's make it further: 20000ms to ensure two separate sessions @@ -648,12 +653,12 @@ def test_session_window_merge_sessions( assert expired[0][1]["start"] == 1000 assert expired[0][1]["end"] == 11000 assert expired[0][1]["value"] == 1 - + assert len(updated) == 1 assert updated[0][1]["start"] == 20000 assert updated[0][1]["end"] == 30000 # 20000 + 10000 assert updated[0][1]["value"] == 10 - + # Add another event to the second session updated, expired = process( window, value=5, key=key, transaction=tx, timestamp_ms=25000 @@ -663,7 +668,7 @@ def test_session_window_merge_sessions( assert updated[0][1]["end"] == 35000 # 25000 + 10000 assert updated[0][1]["value"] == 15 # 10 + 5 assert not expired - + # Now test the limitation: we'll create a third session that could theoretically # merge with the second session if there was a bridging event # But since sessions don't auto-merge, they'll remain separate @@ -675,7 +680,7 @@ def test_session_window_merge_sessions( assert expired[0][1]["start"] == 20000 assert expired[0][1]["end"] == 35000 assert expired[0][1]["value"] == 15 - + # Third session starts assert len(updated) == 1 assert updated[0][1]["start"] == 50000 @@ -687,18 +692,18 @@ def test_session_window_bridging_event_scenario( ): """ Test scenario where an event arrives that could theoretically bridge two sessions. - + This test documents the current behavior where sessions don't auto-merge, even when a bridging event could logically connect them. - + Scenario: 1. Session A: [1000, 11000] with value 5 - 2. Session B: [15000, 25000] with value 10 + 2. Session B: [15000, 25000] with value 10 3. Bridging event at 12000ms that: - Can extend Session A to [1000, 22000] - Now overlaps with Session B [15000, 25000] - Ideally should merge into single session [1000, 25000] with value 15+bridge_value - + Current behavior: Session A gets extended, Session B remains separate Ideal behavior: Sessions A and B get merged when bridging event arrives """ @@ -709,7 +714,7 @@ def test_session_window_bridging_event_scenario( store.assign_partition(0) with store.start_partition_transaction(0) as tx: key = b"key" - + # Create Session A updated, expired = process( window, value=5, key=key, transaction=tx, timestamp_ms=1000 @@ -719,7 +724,7 @@ def test_session_window_bridging_event_scenario( assert updated[0][1]["end"] == 11000 # 1000 + 10000 assert updated[0][1]["value"] == 5 assert not expired - + # Create Session B - close enough that it doesn't expire Session A # Session A expires when time > 11000 + 2000 = 13000 # So event at 12000 should keep Session A alive @@ -732,9 +737,9 @@ def test_session_window_bridging_event_scenario( assert len(updated) == 1 assert updated[0][1]["start"] == 1000 # Session A extended assert updated[0][1]["end"] == 22000 # 12000 + 10000 - assert updated[0][1]["value"] == 15 # 5 + 10 + assert updated[0][1]["value"] == 15 # 5 + 10 assert not expired - + # Now create what would be Session B if Session A hadn't been extended updated, expired = process( window, value=20, key=key, transaction=tx, timestamp_ms=15000 @@ -743,9 +748,9 @@ def test_session_window_bridging_event_scenario( assert len(updated) == 1 assert updated[0][1]["start"] == 1000 # Still Session A assert updated[0][1]["end"] == 25000 # 15000 + 10000 - assert updated[0][1]["value"] == 35 # 5 + 10 + 20 + assert updated[0][1]["value"] == 35 # 5 + 10 + 20 assert not expired - + # Final event to expire the session updated, expired = process( window, value=1, key=key, transaction=tx, timestamp_ms=40000 @@ -754,7 +759,91 @@ def test_session_window_bridging_event_scenario( assert expired[0][1]["start"] == 1000 assert expired[0][1]["end"] == 25000 assert expired[0][1]["value"] == 35 # All events combined - + assert len(updated) == 1 assert updated[0][1]["start"] == 40000 - assert updated[0][1]["value"] == 1 \ No newline at end of file + assert updated[0][1]["value"] == 1 + + def test_session_window_string_key_extension( + self, session_window_definition_factory, state_manager + ): + """ + Test session window extension with string keys. + + This test specifically verifies that session extension works correctly + when using string keys (which need to be serialized to bytes internally). + + This test would have caught the original TypeError bug where + `transaction.delete_window()` was called with a string key instead of + the properly serialized bytes prefix. + """ + window_def = session_window_definition_factory(timeout_ms=10000, grace_ms=1000) + window = window_def.sum() + window.final(closing_strategy="key") + store = state_manager.get_store(stream_id="test", store_name=window.name) + store.assign_partition(0) + with store.start_partition_transaction(0) as tx: + # Use a string key instead of bytes to trigger the serialization path + key = "user_123" + + # Start a session + updated, expired = process( + window, value=100, key=key, transaction=tx, timestamp_ms=1000 + ) + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 + assert updated[0][1]["end"] == 11000 # 1000 + 10000 + assert updated[0][1]["value"] == 100 + assert not expired + + # Extend the session - this should trigger the delete_window call + # that would have failed with the original bug + updated, expired = process( + window, value=200, key=key, transaction=tx, timestamp_ms=5000 + ) + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 # Session extended, same start + assert updated[0][1]["end"] == 15000 # 5000 + 10000 (new end time) + assert updated[0][1]["value"] == 300 # 100 + 200 + assert not expired + + # Extend the session again to make sure it still works + updated, expired = process( + window, value=50, key=key, transaction=tx, timestamp_ms=8000 + ) + assert len(updated) == 1 + assert updated[0][1]["start"] == 1000 # Session extended again + assert updated[0][1]["end"] == 18000 # 8000 + 10000 + assert updated[0][1]["value"] == 350 # 100 + 200 + 50 + assert not expired + + # Test with a different string key to make sure multiple keys work + key2 = "user_456" + updated, expired = process( + window, value=75, key=key2, transaction=tx, timestamp_ms=9000 + ) + assert len(updated) == 1 + assert updated[0][0] == key2 # Different key + assert updated[0][1]["start"] == 9000 + assert updated[0][1]["end"] == 19000 # 9000 + 10000 + assert updated[0][1]["value"] == 75 + assert not expired + + # Expire the first session by advancing time far enough + updated, expired = process( + window, value=25, key=key, transaction=tx, timestamp_ms=30000 + ) + + # Should have expired the first session + assert len(expired) == 1 + assert expired[0][0] == key + assert expired[0][1]["start"] == 1000 + assert expired[0][1]["end"] == 18000 + assert expired[0][1]["value"] == 350 + + # Should have started a new session for the first key + assert len(updated) == 1 + assert updated[0][0] == key + assert updated[0][1]["start"] == 30000 + assert updated[0][1]["end"] == 40000 + assert updated[0][1]["value"] == 25