-
Notifications
You must be signed in to change notification settings - Fork 83
Add support for session window #917
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -9,7 +9,9 @@ With windows, you can calculate such aggregations as: | |||||
- Total of website visitors for every hour | ||||||
- The average speed of a vehicle over the last 10 minutes | ||||||
- Maximum temperature of a sensor observed over 30 second ranges | ||||||
- Give an user a reward after 10 succesful actions | ||||||
- Give an user a reward after 10 succesful actions | ||||||
- Track user activity sessions on a website | ||||||
- Detect fraud patterns in financial transactions | ||||||
|
||||||
|
||||||
## Types of Time in Streaming | ||||||
|
@@ -500,6 +502,220 @@ sdf = ( | |||||
|
||||||
``` | ||||||
|
||||||
## Session Windows | ||||||
|
||||||
Session windows group events that occur within a specified timeout period. Unlike fixed-time windows (tumbling, hopping, sliding), session windows have dynamic durations based on the actual timing of events, making them ideal for user activity tracking, fraud detection, and other event-driven scenarios. | ||||||
|
||||||
A session starts with the first event and extends each time a new event arrives within the timeout period. The session closes after the timeout period with no new events. | ||||||
|
||||||
Key characteristics of session windows: | ||||||
|
||||||
- **Dynamic boundaries**: Each session can have different start and end times based on actual events | ||||||
- **Activity-based**: Sessions extend automatically when events arrive within the timeout period | ||||||
- **Event-driven closure**: Sessions close when no events arrive within the timeout period | ||||||
- **Grace period support**: Late events can still extend sessions if they arrive within the grace period | ||||||
|
||||||
### How Session Windows Work | ||||||
|
||||||
``` | ||||||
Time: 0 5 10 15 20 25 30 35 40 45 50 | ||||||
Events: A B C D E | ||||||
|
||||||
Timeout: 10 seconds | ||||||
Grace: 2 seconds | ||||||
|
||||||
Session 1: [0, 20] - Events A, B (B extends the session from A) | ||||||
Session 2: [25, 35] - Events C, D (D extends the session from C) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't that be
Suggested change
|
||||||
Session 3: [45, 55] - Event E (session will close at 55 if no more events) | ||||||
``` | ||||||
|
||||||
In this example: | ||||||
- Event A starts Session 1 at time 0, session would timeout at time 10 | ||||||
- Event B arrives at time 10, extending Session 1 to timeout at time 20 | ||||||
- Event C arrives at time 25, starting Session 2 (too late for Session 1) | ||||||
- Event D arrives at time 30, extending Session 2 to timeout at time 40 | ||||||
- Event E arrives at time 45, starting Session 3 | ||||||
|
||||||
### Basic Session Window Example | ||||||
|
||||||
Imagine you want to track user activity sessions on a website, where a session continues as long as user actions occur within 30 minutes of each other: | ||||||
|
||||||
Input: | ||||||
```json | ||||||
{"user_action": "page_view", "page": "/home", "timestamp": 1000} | ||||||
{"user_action": "click", "element": "button", "timestamp": 800000} | ||||||
{"user_action": "page_view", "page": "/products", "timestamp": 1200000} | ||||||
{"user_action": "purchase", "amount": 50, "timestamp": 2000000} | ||||||
``` | ||||||
|
||||||
Here's how to track user sessions using session windows: | ||||||
|
||||||
```python | ||||||
from datetime import timedelta | ||||||
from quixstreams import Application | ||||||
from quixstreams.dataframe.windows import Count, Collect | ||||||
|
||||||
app = Application(...) | ||||||
sdf = app.dataframe(...) | ||||||
|
||||||
sdf = ( | ||||||
# Define a session window with 30-minute timeout and 5-minute grace period | ||||||
.session_window( | ||||||
timeout_ms=timedelta(minutes=30), | ||||||
grace_ms=timedelta(minutes=5) | ||||||
) | ||||||
|
||||||
# Count the number of actions in each session and collect all actions | ||||||
.agg( | ||||||
action_count=Count(), | ||||||
actions=Collect("user_action") | ||||||
) | ||||||
|
||||||
# Emit results when sessions are complete | ||||||
.final() | ||||||
) | ||||||
|
||||||
# Expected output (when session expires): | ||||||
# { | ||||||
# "start": 1000, | ||||||
# "end": 2000000 + 1800000, # last event + timeout | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The start of the window is rightfully assigned to the timestamp of the first message in a session. Analogously, the end of the window should be assigned to the timestamp of the last message in the session, rather than being extended by the full length of a timeout. |
||||||
# "action_count": 4, | ||||||
# "actions": ["page_view", "click", "page_view", "purchase"] | ||||||
# } | ||||||
``` | ||||||
|
||||||
### Session Window with Current Mode | ||||||
|
||||||
For real-time monitoring, you can use `.current()` mode to get updates as the session progresses: | ||||||
|
||||||
```python | ||||||
from datetime import timedelta | ||||||
from quixstreams import Application | ||||||
from quixstreams.dataframe.windows import Sum, Count | ||||||
|
||||||
app = Application(...) | ||||||
sdf = app.dataframe(...) | ||||||
|
||||||
sdf = ( | ||||||
# Define a session window with 10-second timeout | ||||||
.session_window(timeout_ms=timedelta(seconds=10)) | ||||||
|
||||||
# Track total purchase amount and count in each session | ||||||
.agg( | ||||||
total_amount=Sum("amount"), | ||||||
purchase_count=Count() | ||||||
) | ||||||
|
||||||
# Emit updates for each message (real-time session tracking) | ||||||
.current() | ||||||
) | ||||||
|
||||||
# Output for each incoming event: | ||||||
# Event 1: {"start": 1000, "end": 11000, "total_amount": 25, "purchase_count": 1} | ||||||
# Event 2: {"start": 1000, "end": 15000, "total_amount": 75, "purchase_count": 2} # session extended | ||||||
# Event 3: {"start": 1000, "end": 18000, "total_amount": 125, "purchase_count": 3} # session extended again | ||||||
``` | ||||||
|
||||||
### Handling Late Events in Sessions | ||||||
|
||||||
Session windows support grace periods to handle out-of-order events: | ||||||
|
||||||
```python | ||||||
from datetime import timedelta | ||||||
from quixstreams import Application | ||||||
from quixstreams.dataframe.windows import Count | ||||||
|
||||||
def on_late_session_event( | ||||||
value, key, timestamp_ms, late_by_ms, start, end, name, topic, partition, offset | ||||||
): | ||||||
"""Handle late events that couldn't extend any session""" | ||||||
print(f"Late event for key {key}: {late_by_ms}ms late") | ||||||
print(f"Event would have belonged to session [{start}, {end}]") | ||||||
return False # Suppress default logging | ||||||
|
||||||
app = Application(...) | ||||||
sdf = app.dataframe(...) | ||||||
|
||||||
sdf = ( | ||||||
# Session window with 5-minute timeout and 1-minute grace period | ||||||
.session_window( | ||||||
timeout_ms=timedelta(minutes=5), | ||||||
grace_ms=timedelta(minutes=1), | ||||||
on_late=on_late_session_event | ||||||
) | ||||||
.agg(event_count=Count()) | ||||||
.final() | ||||||
) | ||||||
``` | ||||||
|
||||||
### Session Window Use Cases | ||||||
|
||||||
**1. User Activity Tracking** | ||||||
```python | ||||||
# Track user sessions on a website or app | ||||||
.session_window(timeout_ms=timedelta(minutes=30)) | ||||||
.agg( | ||||||
page_views=Count(), | ||||||
unique_pages=Count("page_url", unique=True), | ||||||
session_duration=Max("timestamp") - Min("timestamp") | ||||||
) | ||||||
``` | ||||||
|
||||||
**2. Fraud Detection** | ||||||
```python | ||||||
# Detect suspicious transaction patterns | ||||||
.session_window(timeout_ms=timedelta(minutes=10)) | ||||||
.agg( | ||||||
transaction_count=Count(), | ||||||
total_amount=Sum("amount"), | ||||||
locations=Collect("location") | ||||||
) | ||||||
``` | ||||||
|
||||||
**3. IoT Device Monitoring** | ||||||
```python | ||||||
# Monitor device activity sessions | ||||||
.session_window(timeout_ms=timedelta(hours=1)) | ||||||
.agg( | ||||||
readings_count=Count(), | ||||||
avg_temperature=Mean("temperature"), | ||||||
max_pressure=Max("pressure") | ||||||
) | ||||||
``` | ||||||
|
||||||
**4. Gaming Analytics** | ||||||
```python | ||||||
# Track gaming sessions | ||||||
.session_window(timeout_ms=timedelta(minutes=20)) | ||||||
.agg( | ||||||
actions_performed=Count(), | ||||||
points_earned=Sum("points"), | ||||||
levels_completed=Count("level_completed") | ||||||
) | ||||||
``` | ||||||
|
||||||
### Session Window Parameters | ||||||
|
||||||
- **`timeout_ms`**: The session timeout period. If no new events arrive within this period, the session will be closed. Can be specified as either an `int` (milliseconds) or a `timedelta` object. | ||||||
|
||||||
- **`grace_ms`**: The grace period for data arrival. Allows late-arriving data to be included in the session, even if it arrives after the session has theoretically timed out. Can be specified as either an `int` (milliseconds) or a `timedelta` object. | ||||||
|
||||||
- **`name`**: Optional unique identifier for the window. If not provided, it will be automatically generated based on the window's properties. | ||||||
|
||||||
- **`on_late`**: Optional callback to react to late records that cannot extend any existing session. Use this to customize logging or route late events to a dead-letter queue. | ||||||
|
||||||
### Session Window Behavior | ||||||
|
||||||
**Session Creation**: A new session starts when an event arrives and no existing session can accommodate it (i.e., all existing sessions have timed out). | ||||||
|
||||||
**Session Extension**: An existing session is extended when an event arrives within `timeout + grace_period` of the session's last activity. | ||||||
|
||||||
**Session Closure**: A session closes when the current time exceeds `session_end_time + grace_period`, where `session_end_time = last_event_time + timeout`. | ||||||
|
||||||
**Key Grouping**: Like all windows in Quix Streams, sessions are grouped by message key. Each key maintains its own independent sessions. | ||||||
|
||||||
**Event Time**: Sessions use event time (from Kafka message timestamps) rather than processing time. | ||||||
|
||||||
## Lateness and Out-of-Order Processing | ||||||
When working with event time, some events may be processed later than they're supposed to. | ||||||
Such events are called **"out-of-order"** because they violate the expected order of time in the data stream. | ||||||
|
@@ -540,7 +756,7 @@ The appropriate value for a grace period varies depending on the use case. | |||||
### Reacting on late events | ||||||
!!! info New in v3.8.0 | ||||||
|
||||||
To react on late records coming into time windows, you can pass the `on_late` callbacks to `.tumbling_window()`, `.hopping_window()` and `.sliding_window()` methods. | ||||||
To react on late records coming into time windows, you can pass the `on_late` callbacks to `.tumbling_window()`, `.hopping_window()`, `.sliding_window()`, and `.session_window()` methods. | ||||||
|
||||||
You can use this callback to customize the logging of such messages or to send them to some dead-letter queue, for example. | ||||||
|
||||||
|
@@ -667,6 +883,8 @@ In this strategy, messages advance time and close only windows with the **same** | |||||
|
||||||
If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message with the same key is received. | ||||||
|
||||||
Session windows also support both closing strategies. With **key** strategy, sessions for each key close independently. With **partition** strategy, any message can advance time and close sessions for all keys in the partition. | ||||||
|
||||||
```python | ||||||
from datetime import timedelta | ||||||
from quixstreams import Application | ||||||
|
@@ -780,7 +998,7 @@ described in [the "Updating Kafka Headers" section](./processing.md#updating-kaf | |||||
|
||||||
Here are some general concepts about how windowed aggregations are implemented in Quix Streams: | ||||||
|
||||||
- Only time-based windows are supported. | ||||||
- Time-based windows (tumbling, hopping, sliding, session) and count-based windows are supported. | ||||||
- Every window is grouped by the current Kafka message key. | ||||||
- Messages with `None` key will be ignored. | ||||||
- The minimal window unit is a **millisecond**. More fine-grained values (e.g. microseconds) will be rounded towards the closest millisecond number. | ||||||
|
@@ -794,10 +1012,12 @@ window specification. | |||||
|
||||||
The state store name is auto-generated by default using the following window attributes: | ||||||
|
||||||
- Window type: `"tumbling"` or `"hopping"` | ||||||
- Window parameters: `duration_ms` and `step_ms` | ||||||
- Window type: `"tumbling"`, `"hopping"`, `"sliding"`, or `"session"` | ||||||
- Window parameters: `duration_ms` and `step_ms` for time-based windows, `timeout_ms` for session windows | ||||||
|
||||||
E.g. a store name for a hopping window of 30 seconds with a 5 second step will be `hopping_window_30000_5000`. | ||||||
Examples: | ||||||
- A hopping window of 30 seconds with a 5 second step: `hopping_window_30000_5000` | ||||||
- A session window with 30 second timeout: `session_window_30000` | ||||||
|
||||||
### Updating Window Definitions | ||||||
|
||||||
|
@@ -807,8 +1027,8 @@ When you change the definition of the window (e.g. its size), the data in the st | |||||
|
||||||
Quix Streams handles some of the situations, like: | ||||||
|
||||||
- Updating window type (e.g. from tumbling to hopping) | ||||||
- Updating window period or step | ||||||
- Updating window type (e.g. from tumbling to hopping, from hopping to session) | ||||||
- Updating window period, step, or timeout | ||||||
- Adding/Removing/Updating an aggregation function (except `Reduce()`) | ||||||
|
||||||
Updating the window type and parameters will change the name of the underlying state store, and the new window definition will use a different one. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,7 @@ | |
from .windows import ( | ||
HoppingCountWindowDefinition, | ||
HoppingTimeWindowDefinition, | ||
SessionWindowDefinition, | ||
SlidingCountWindowDefinition, | ||
SlidingTimeWindowDefinition, | ||
TumblingCountWindowDefinition, | ||
|
@@ -1484,6 +1485,103 @@ def sliding_count_window( | |
name=name, | ||
) | ||
|
||
def session_window( | ||
self, | ||
timeout_ms: Union[int, timedelta], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A more widely adopted nomenclature for session windows is "gap". Let's rename it to |
||
grace_ms: Union[int, timedelta] = 0, | ||
name: Optional[str] = None, | ||
on_late: Optional[WindowOnLateCallback] = None, | ||
) -> SessionWindowDefinition: | ||
""" | ||
Create a session window transformation on this StreamingDataFrame. | ||
|
||
Session windows group events that occur within a specified timeout period. | ||
A session starts with the first event and extends each time a new event arrives | ||
within the timeout period. The session closes after the timeout period with no | ||
new events. | ||
|
||
Unlike fixed-time windows, session windows have dynamic durations based on the | ||
actual events and their timing, making them ideal for user activity tracking, | ||
fraud detection, and other event-driven scenarios. | ||
|
||
They allow performing stateful aggregations like `sum`, `reduce`, etc. | ||
on top of the data and emit results downstream. | ||
|
||
Notes: | ||
|
||
- The timestamp of the aggregation result is set to the session start timestamp. | ||
- Every session is grouped by the current Kafka message key. | ||
- Messages with `None` key will be ignored. | ||
- Sessions always use the current event time. | ||
|
||
Example Snippet: | ||
|
||
```python | ||
from quixstreams import Application | ||
import quixstreams.dataframe.windows.aggregations as agg | ||
|
||
app = Application() | ||
sdf = app.dataframe(...) | ||
|
||
sdf = ( | ||
# Define a session window with 30-second timeout and 10-second grace period | ||
sdf.session_window( | ||
timeout_ms=timedelta(seconds=30), | ||
grace_ms=timedelta(seconds=10) | ||
) | ||
|
||
# Specify the aggregation function | ||
.agg(value=agg.Sum()) | ||
|
||
# Specify how the results should be emitted downstream. | ||
# "current()" will emit results as they come for each updated session, | ||
# possibly producing multiple messages per key-session pair | ||
# "final()" will emit sessions only when they are closed and cannot | ||
# receive any updates anymore. | ||
.final() | ||
) | ||
``` | ||
|
||
:param timeout_ms: The session timeout period. | ||
If no new events arrive within this period, the session will be closed. | ||
Can be specified as either an `int` representing milliseconds | ||
or a `timedelta` object. | ||
>***NOTE:*** `timedelta` objects will be rounded to the closest millisecond | ||
value. | ||
|
||
:param grace_ms: The grace period for data arrival. | ||
It allows late-arriving data to be included in the session, | ||
even if it arrives after the session has theoretically timed out. | ||
Can be specified as either an `int` representing milliseconds | ||
or a `timedelta` object. | ||
>***NOTE:*** `timedelta` objects will be rounded to the closest millisecond | ||
value. | ||
|
||
:param name: The unique identifier for the window. If not provided, it will be | ||
automatically generated based on the window's properties. | ||
|
||
:param on_late: an optional callback to react on late records in sessions and | ||
to configure the logging of such events. | ||
If the callback returns `True`, the message about a late record will be logged | ||
(default behavior). | ||
Otherwise, no message will be logged. | ||
|
||
:return: `SessionWindowDefinition` instance representing the session window | ||
configuration. | ||
This object can be further configured with aggregation functions | ||
like `sum`, `count`, etc. applied to the StreamingDataFrame. | ||
""" | ||
timeout_ms = ensure_milliseconds(timeout_ms) | ||
grace_ms = ensure_milliseconds(grace_ms) | ||
|
||
return SessionWindowDefinition( | ||
timeout_ms=timeout_ms, | ||
grace_ms=grace_ms, | ||
dataframe=self, | ||
name=name, | ||
on_late=on_late, | ||
) | ||
|
||
def fill(self, *columns: str, **mapping: Any) -> "StreamingDataFrame": | ||
""" | ||
Fill missing values in the message value with a constant value. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.