Skip to content

PoC: Wall Clock #989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions quixstreams/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import uuid
import warnings
from collections import defaultdict
from datetime import datetime
from itertools import chain
from pathlib import Path
from typing import Callable, List, Literal, Optional, Protocol, Tuple, Type, Union, cast

Expand All @@ -30,6 +32,8 @@
from .logging import LogLevel, configure_logging
from .models import (
DeserializerType,
MessageContext,
Row,
SerializerType,
TimestampExtractor,
Topic,
Expand Down Expand Up @@ -152,6 +156,7 @@ def __init__(
topic_create_timeout: float = 60,
processing_guarantee: ProcessingGuarantee = "at-least-once",
max_partition_buffer_size: int = 10000,
wall_clock_interval: float = 0.0,
):
"""
:param broker_address: Connection settings for Kafka.
Expand Down Expand Up @@ -220,6 +225,12 @@ def __init__(
It is a soft limit, and the actual number of buffered messages can be up to x2 higher.
Lower value decreases the memory use, but increases the latency.
Default - `10000`.
:param wall_clock_interval: the interval (seconds) at which to invoke
the registered wall clock logic.
The wall clock timing starts counting from application start.
TODO: Save and respect last wall clock timestamp.
If the value is 0, no wall clock logic will be invoked.
Default - `0.0`.

<br><br>***Error Handlers***<br>
To handle errors, `Application` accepts callbacks triggered when
Expand Down Expand Up @@ -371,6 +382,10 @@ def __init__(
recovery_manager=recovery_manager,
)

self._wall_clock_active = wall_clock_interval > 0
self._wall_clock_interval = wall_clock_interval
self._wall_clock_last_sent = datetime.now().timestamp()

self._source_manager = SourceManager()
self._sink_manager = SinkManager()
self._dataframe_registry = DataFrameRegistry()
Expand Down Expand Up @@ -900,6 +915,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
processing_context = self._processing_context
source_manager = self._source_manager
process_message = self._process_message
process_wall_clock = self._process_wall_clock
printer = self._processing_context.printer
run_tracker = self._run_tracker
consumer = self._consumer
Expand All @@ -912,6 +928,9 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
)

dataframes_composed = self._dataframe_registry.compose_all(sink=sink)
wall_clock_executors = self._dataframe_registry.compose_wall_clock()
if not wall_clock_executors:
self._wall_clock_active = False

processing_context.init_checkpoint()
run_tracker.set_as_running()
Expand All @@ -923,6 +942,7 @@ def _run_dataframe(self, sink: Optional[VoidExecutor] = None):
run_tracker.timeout_refresh()
else:
process_message(dataframes_composed)
process_wall_clock(wall_clock_executors)
processing_context.commit_checkpoint()
consumer.resume_backpressured()
source_manager.raise_for_error()
Expand Down Expand Up @@ -1006,6 +1026,97 @@ def _process_message(self, dataframe_composed):
if self._on_message_processed is not None:
self._on_message_processed(topic_name, partition, offset)

def _process_wall_clock(self, wall_clock_executors):
# Emit time-based "ticks" when the wall-clock interval elapses.
# For each executor (grouped by topics), select one partition per partition id
# and determine an offset to include in MessageContext.
if not self._wall_clock_active:
return

# Rate-limit by interval; skip until enough time has elapsed since last send.
now = datetime.now().timestamp()
if self._wall_clock_last_sent > now - self._wall_clock_interval:
return

# Synthetic "tick" payload (no value/key, headers empty, timestamp in ms).
value, key, timestamp, headers = None, None, int(now * 1000), {}

# In-flight processed offsets within the current (open) checkpoint.
processed_offsets = self._processing_context.checkpoint.tp_offsets
# Only consider currently assigned topic-partitions.
assigned_tps = self._consumer.assignment()
# Cache known offsets to avoid resolving them multiple times for different executors.
# Keyed by (topic, partition) to avoid relying on TopicPartition instance identity.
known_offsets: dict[tuple[str, int], int] = {}

for topics, executor in wall_clock_executors:
# candidate_partitions: partitions still needing an offset resolved
candidate_partitions: dict[int, set[TopicPartition]] = defaultdict(set)
# selected_partitions: final partition_id -> (topic, offset)
selected_partitions: dict[int, tuple[str, int]] = {}

for tp in assigned_tps:
known_offset = known_offsets.get((tp.topic, tp.partition))
if known_offset is not None:
selected_partitions[tp.partition] = (tp.topic, known_offset)
continue

if tp.topic in topics and tp.partition not in selected_partitions:
# Prefer the most recent known processed offset if available.
if processed_offset := processed_offsets.get(
(tp.topic, tp.partition)
):
# Use offset from the in-flight checkpoint.
selected_partitions[tp.partition] = (tp.topic, processed_offset)
known_offsets[(tp.topic, tp.partition)] = processed_offset
else:
# Will resolve via committed offsets below.
candidate_partitions[tp.partition].add(tp)

if candidate_partitions:
# Best-effort: fetch committed offsets in batch for unresolved partitions.
committed_tps = self._consumer.committed(
list(chain(*candidate_partitions.values())), timeout=30
)
for tp in committed_tps:
if tp.error:
raise RuntimeError(
f"Failed to get committed offsets for "
f'"{tp.topic}[{tp.partition}]" from the broker: {tp.error}'
)
if tp.partition not in selected_partitions:
# Committed offset is "next to consume"; last processed is offset - 1.
# The "invalid/unset" broker offset is negative.
offset = tp.offset - 1 if tp.offset >= 0 else tp.offset
selected_partitions[tp.partition] = (tp.topic, offset)
known_offsets[(tp.topic, tp.partition)] = offset

# Execute callback for each selected topic-partition with its offset.
for partition, (topic, offset) in selected_partitions.items():
row = Row(
value=value,
key=key,
timestamp=timestamp,
context=MessageContext(
topic=topic,
partition=partition,
offset=offset,
size=-1,
),
headers=headers,
)
context = copy_context()
context.run(set_message_context, row.context)
try:
context.run(executor, value, key, timestamp, headers)
except Exception as exc:
to_suppress = self._on_processing_error(exc, row, logger)
if not to_suppress:
raise

# Record the emission time for rate-limiting.
self._wall_clock_last_sent = now

def _on_assign(self, _, topic_partitions: List[TopicPartition]):
"""
Assign new topic partitions to consumer and state.
Expand Down
12 changes: 11 additions & 1 deletion quixstreams/checkpointing/checkpoint.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import logging
import time
from abc import abstractmethod
from typing import Dict, Tuple
from types import MappingProxyType
from typing import Dict, Mapping, Tuple

from confluent_kafka import KafkaException, TopicPartition

Expand Down Expand Up @@ -55,6 +56,15 @@ def __init__(
self._commit_every = commit_every
self._total_offsets_processed = 0

@property
def tp_offsets(self) -> Mapping[Tuple[str, int], int]:
"""
Read-only view of processed (but not yet committed) offsets in the current checkpoint.

:return: a read-only mapping {(topic, partition): last_processed_offset}
"""
return MappingProxyType(self._tp_offsets)

def expired(self) -> bool:
"""
Returns `True` if checkpoint deadline has expired OR
Expand Down
14 changes: 12 additions & 2 deletions quixstreams/core/stream/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,21 @@ def add_update(
return self._add(update_func)

@overload
def add_transform(self, func: TransformCallback, *, expand: Literal[False] = False):
def add_transform(
self,
func: TransformCallback,
*,
expand: Literal[False] = False,
):
pass

@overload
def add_transform(self, func: TransformExpandedCallback, *, expand: Literal[True]):
def add_transform(
self,
func: TransformExpandedCallback,
*,
expand: Literal[True],
):
pass

def add_transform(
Expand Down
4 changes: 4 additions & 0 deletions quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1696,6 +1696,10 @@ def concat(self, other: "StreamingDataFrame") -> "StreamingDataFrame":
stream=merged_stream, stream_id=merged_stream_id
)

def concat_wall_clock(self, stream: Stream) -> "StreamingDataFrame":
self._registry.register_wall_clock(self, stream)
return self.__dataframe_clone__(stream=self.stream.merge(stream))

def join_asof(
self,
right: "StreamingDataFrame",
Expand Down
18 changes: 18 additions & 0 deletions quixstreams/dataframe/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class DataFrameRegistry:

def __init__(self) -> None:
self._registry: dict[str, Stream] = {}
self._wall_clock_registry: dict[Stream, tuple[str, ...]] = {}
self._topics: list[Topic] = []
self._repartition_origins: set[str] = set()
self._topics_to_stream_ids: dict[str, set[str]] = {}
Expand Down Expand Up @@ -69,6 +70,12 @@ def register_root(
self._topics.append(topic)
self._registry[topic.name] = dataframe.stream

def register_wall_clock(
self, dataframe: "StreamingDataFrame", stream: Stream
) -> None:
# Store the topic names as an immutable tuple for stable typing
self._wall_clock_registry[stream] = tuple(t.name for t in dataframe.topics)

def register_groupby(
self,
source_sdf: "StreamingDataFrame",
Expand Down Expand Up @@ -123,6 +130,17 @@ def compose_all(
executors[topic] = root_executors[root_stream]
return executors

def compose_wall_clock(self) -> list[tuple[tuple[str, ...], VoidExecutor]]:
"""
Compose all wall clock Streams and return executors keyed by stream_id.
Returns mapping: {stream_id: (topics, executor)}
"""
executors = []
for root_stream, topics in self._wall_clock_registry.items():
root_executors = root_stream.compose()
executors.append((topics, root_executors[root_stream]))
return executors

def register_stream_id(self, stream_id: str, topic_names: list[str]):
"""
Register a mapping between the stream_id and topic names.
Expand Down