Skip to content

Commit dda7ccf

Browse files
committed
Refactor wall clock processing
1 parent 920f3ec commit dda7ccf

File tree

3 files changed

+61
-28
lines changed

3 files changed

+61
-28
lines changed

quixstreams/app.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,16 +1035,44 @@ def _process_wall_clock(self, wall_clock_executors):
10351035

10361036
value, key, timestamp, headers = None, None, int(now * 1000), {}
10371037

1038-
for tp in self._consumer.assignment():
1039-
if executor := wall_clock_executors.get(tp.topic):
1038+
# Offsets processed in the current, open checkpoint (in-flight)
1039+
inflight_offsets = self._processing_context.checkpoint.inflight_offsets
1040+
assignment = self._consumer.assignment()
1041+
1042+
for topics, executor in wall_clock_executors.items():
1043+
topic_names = {t.name for t in topics}
1044+
seen_partitions: set[int] = set()
1045+
tps: list[tuple[str, int, int]] = []
1046+
1047+
# Gather assigned TPs that belong to the topics
1048+
for tp in assignment:
1049+
if tp.topic in topic_names and tp.partition not in seen_partitions:
1050+
offset = inflight_offsets.get((tp.topic, tp.partition))
1051+
if offset is None:
1052+
committed_tp = self._consumer.committed([tp], timeout=30)[0]
1053+
if committed_tp.error:
1054+
raise RuntimeError(
1055+
"Failed to get committed offsets for "
1056+
f'"{committed_tp.topic}[{committed_tp.partition}]" '
1057+
f"from the broker: {committed_tp.error}"
1058+
)
1059+
if committed_tp.offset >= 0:
1060+
offset = committed_tp.offset - 1
1061+
1062+
if offset is not None:
1063+
seen_partitions.add(tp.partition)
1064+
tps.append((tp.topic, tp.partition, offset))
1065+
1066+
# Execute callback for each selected topic-partition with its offset
1067+
for topic, partition, offset in tps:
10401068
row = Row(
10411069
value=value,
10421070
key=key,
10431071
timestamp=timestamp,
10441072
context=MessageContext(
1045-
topic=tp.topic,
1046-
partition=tp.partition,
1047-
offset=-1, # TODO: get correct offsets
1073+
topic=topic,
1074+
partition=partition,
1075+
offset=offset,
10481076
size=-1,
10491077
),
10501078
headers=headers,

quixstreams/checkpointing/checkpoint.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
22
import time
33
from abc import abstractmethod
4-
from typing import Dict, Tuple
4+
from types import MappingProxyType
5+
from typing import Dict, Mapping, Tuple
56

67
from confluent_kafka import KafkaException, TopicPartition
78

@@ -55,6 +56,15 @@ def __init__(
5556
self._commit_every = commit_every
5657
self._total_offsets_processed = 0
5758

59+
@property
60+
def inflight_offsets(self) -> Mapping[Tuple[str, int], int]:
61+
"""
62+
Read-only view of processed (but not yet committed) offsets in the current checkpoint.
63+
64+
:return: a read-only mapping {(topic, partition): last_processed_offset}
65+
"""
66+
return MappingProxyType(self._tp_offsets)
67+
5868
def expired(self) -> bool:
5969
"""
6070
Returns `True` if checkpoint deadline has expired OR

quixstreams/dataframe/registry.py

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class DataFrameRegistry:
2222

2323
def __init__(self) -> None:
2424
self._registry: dict[str, Stream] = {}
25-
self._wall_clock_registry: dict[str, Stream] = {}
25+
self._wall_clock_registry: dict[str, tuple[tuple[Topic, ...], Stream]] = {}
2626
self._topics: list[Topic] = []
2727
self._repartition_origins: set[str] = set()
2828
self._topics_to_stream_ids: dict[str, set[str]] = {}
@@ -74,15 +74,11 @@ def register_wall_clock(
7474
self, dataframe: "StreamingDataFrame", stream: Stream
7575
) -> None:
7676
"""
77-
Register a wall clock stream for the given topic.
77+
Register a wall clock stream root for the given dataframe.
78+
Stores the Stream itself to be composed later with an optional sink.
7879
"""
79-
topics = dataframe.topics
80-
if len(topics) > 1:
81-
raise ValueError(
82-
f"Expected a StreamingDataFrame with one topic, got {len(topics)}"
83-
)
84-
topic = topics[0]
85-
self._wall_clock_registry[topic.name] = stream
80+
# TODO: What if there are more wall clock streams for the same stream_id?
81+
self._wall_clock_registry[dataframe.stream_id] = (dataframe.topics, stream)
8682

8783
def register_groupby(
8884
self,
@@ -128,28 +124,27 @@ def compose_all(
128124
:param sink: callable to accumulate the results of the execution, optional.
129125
:return: a {topic_name: composed} dict, where composed is a callable
130126
"""
131-
return self._compose(registry=self._registry, sink=sink)
132-
133-
def compose_wall_clock(self) -> dict[str, VoidExecutor]:
134-
"""
135-
Composes all the wall clock streams and returns a dict of format {<topic>: <VoidExecutor>}
136-
:return: a {topic_name: composed} dict, where composed is a callable
137-
"""
138-
return self._compose(registry=self._wall_clock_registry)
139-
140-
def _compose(
141-
self, registry: dict[str, Stream], sink: Optional[VoidExecutor] = None
142-
) -> dict[str, VoidExecutor]:
143127
executors = {}
144128
# Go over the registered topics with root Streams and compose them
145-
for topic, root_stream in registry.items():
129+
for topic, root_stream in self._registry.items():
146130
# If a root stream is connected to other roots, ".compose()" will
147131
# return them all.
148132
# Use the registered root Stream to filter them out.
149133
root_executors = root_stream.compose(sink=sink)
150134
executors[topic] = root_executors[root_stream]
151135
return executors
152136

137+
def compose_wall_clock(self) -> dict[tuple[Topic, ...], VoidExecutor]:
138+
"""
139+
Compose all wall clock Streams and return executors keyed by stream_id.
140+
Returns mapping: {stream_id: (topics, executor)}
141+
"""
142+
executors = {}
143+
for _, (topics, root_stream) in self._wall_clock_registry.items():
144+
root_executors = root_stream.compose()
145+
executors[topics] = root_executors[root_stream]
146+
return executors
147+
153148
def register_stream_id(self, stream_id: str, topic_names: list[str]):
154149
"""
155150
Register a mapping between the stream_id and topic names.

0 commit comments

Comments
 (0)