Skip to content

Commit 920f3ec

Browse files
committed
Rollback changes to TransformFunction
1 parent 9be8da8 commit 920f3ec

File tree

3 files changed

+10
-121
lines changed

3 files changed

+10
-121
lines changed

quixstreams/core/stream/functions/transform.py

Lines changed: 8 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,7 @@
11
from typing import Any, Literal, Union, cast, overload
22

33
from .base import StreamFunction
4-
from .types import (
5-
TransformCallback,
6-
TransformExpandedCallback,
7-
TransformWallClockCallback,
8-
TransformWallClockExpandedCallback,
9-
VoidExecutor,
10-
)
4+
from .types import TransformCallback, TransformExpandedCallback, VoidExecutor
115

126
__all__ = ("TransformFunction",)
137

@@ -29,81 +23,28 @@ class TransformFunction(StreamFunction):
2923

3024
@overload
3125
def __init__(
32-
self,
33-
func: TransformCallback,
34-
expand: Literal[False] = False,
35-
wall_clock: Literal[False] = False,
36-
) -> None: ...
37-
38-
@overload
39-
def __init__(
40-
self,
41-
func: TransformExpandedCallback,
42-
expand: Literal[True],
43-
wall_clock: Literal[False] = False,
44-
) -> None: ...
45-
46-
@overload
47-
def __init__(
48-
self,
49-
func: TransformWallClockCallback,
50-
expand: Literal[False] = False,
51-
wall_clock: Literal[True] = True,
26+
self, func: TransformCallback, expand: Literal[False] = False
5227
) -> None: ...
5328

5429
@overload
5530
def __init__(
56-
self,
57-
func: TransformWallClockExpandedCallback,
58-
expand: Literal[True],
59-
wall_clock: Literal[True],
31+
self, func: TransformExpandedCallback, expand: Literal[True]
6032
) -> None: ...
6133

6234
def __init__(
6335
self,
64-
func: Union[
65-
TransformCallback,
66-
TransformExpandedCallback,
67-
TransformWallClockCallback,
68-
TransformWallClockExpandedCallback,
69-
],
36+
func: Union[TransformCallback, TransformExpandedCallback],
7037
expand: bool = False,
71-
wall_clock: bool = False,
7238
):
7339
super().__init__(func)
7440

75-
self.func: Union[
76-
TransformCallback,
77-
TransformExpandedCallback,
78-
TransformWallClockCallback,
79-
TransformWallClockExpandedCallback,
80-
]
41+
self.func: Union[TransformCallback, TransformExpandedCallback]
8142
self.expand = expand
82-
self.wall_clock = wall_clock
8343

8444
def get_executor(self, *child_executors: VoidExecutor) -> VoidExecutor:
8545
child_executor = self._resolve_branching(*child_executors)
8646

87-
if self.expand and self.wall_clock:
88-
wall_clock_expanded_func = cast(
89-
TransformWallClockExpandedCallback, self.func
90-
)
91-
92-
def wrapper(
93-
value: Any,
94-
key: Any,
95-
timestamp: int,
96-
headers: Any,
97-
):
98-
for (
99-
new_value,
100-
new_key,
101-
new_timestamp,
102-
new_headers,
103-
) in wall_clock_expanded_func(timestamp):
104-
child_executor(new_value, new_key, new_timestamp, new_headers)
105-
106-
elif self.expand:
47+
if self.expand:
10748
expanded_func = cast(TransformExpandedCallback, self.func)
10849

10950
def wrapper(
@@ -116,22 +57,8 @@ def wrapper(
11657
for new_value, new_key, new_timestamp, new_headers in result:
11758
child_executor(new_value, new_key, new_timestamp, new_headers)
11859

119-
elif self.wall_clock:
120-
wall_clock_func = cast(TransformWallClockCallback, self.func)
121-
122-
def wrapper(
123-
value: Any,
124-
key: Any,
125-
timestamp: int,
126-
headers: Any,
127-
):
128-
new_value, new_key, new_timestamp, new_headers = wall_clock_func(
129-
timestamp
130-
)
131-
child_executor(new_value, new_key, new_timestamp, new_headers)
132-
13360
else:
134-
regular_func = cast(TransformCallback, self.func)
61+
func = cast(TransformCallback, self.func)
13562

13663
def wrapper(
13764
value: Any,
@@ -140,7 +67,7 @@ def wrapper(
14067
headers: Any,
14168
):
14269
# Execute a function on a single value and return its result
143-
new_value, new_key, new_timestamp, new_headers = regular_func(
70+
new_value, new_key, new_timestamp, new_headers = func(
14471
value, key, timestamp, headers
14572
)
14673
child_executor(new_value, new_key, new_timestamp, new_headers)

quixstreams/core/stream/functions/types.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@
1414
"FilterWithMetadataCallback",
1515
"TransformCallback",
1616
"TransformExpandedCallback",
17-
"TransformWallClockCallback",
18-
"TransformWallClockExpandedCallback",
1917
)
2018

2119

@@ -37,10 +35,6 @@ def __bool__(self) -> bool: ...
3735
TransformExpandedCallback = Callable[
3836
[Any, Any, int, Any], Iterable[Tuple[Any, Any, int, Any]]
3937
]
40-
TransformWallClockCallback = Callable[[int], Tuple[Any, Any, int, Any]]
41-
TransformWallClockExpandedCallback = Callable[
42-
[int], Iterable[Tuple[Any, Any, int, Any]]
43-
]
4438

4539
StreamCallback = Union[
4640
ApplyCallback,

quixstreams/core/stream/stream.py

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@
3030
TransformCallback,
3131
TransformExpandedCallback,
3232
TransformFunction,
33-
TransformWallClockCallback,
34-
TransformWallClockExpandedCallback,
3533
UpdateCallback,
3634
UpdateFunction,
3735
UpdateWithMetadataCallback,
@@ -256,7 +254,6 @@ def add_transform(
256254
func: TransformCallback,
257255
*,
258256
expand: Literal[False] = False,
259-
wall_clock: Literal[False] = False,
260257
):
261258
pass
262259

@@ -266,41 +263,14 @@ def add_transform(
266263
func: TransformExpandedCallback,
267264
*,
268265
expand: Literal[True],
269-
wall_clock: Literal[False] = False,
270266
):
271267
pass
272268

273-
@overload
274-
def add_transform(
275-
self,
276-
func: TransformWallClockCallback,
277-
*,
278-
expand: Literal[False] = False,
279-
wall_clock: Literal[True],
280-
):
281-
pass
282-
283-
@overload
284269
def add_transform(
285270
self,
286-
func: TransformWallClockExpandedCallback,
287-
*,
288-
expand: Literal[True],
289-
wall_clock: Literal[True],
290-
):
291-
pass
292-
293-
def add_transform(
294-
self,
295-
func: Union[
296-
TransformCallback,
297-
TransformExpandedCallback,
298-
TransformWallClockCallback,
299-
TransformWallClockExpandedCallback,
300-
],
271+
func: Union[TransformCallback, TransformExpandedCallback],
301272
*,
302273
expand: bool = False,
303-
wall_clock: bool = False,
304274
) -> "Stream":
305275
"""
306276
Add a "transform" function to the Stream, that will mutate the input value.
@@ -316,11 +286,9 @@ def add_transform(
316286
:param expand: if True, expand the returned iterable into individual items
317287
downstream. If returned value is not iterable, `TypeError` will be raised.
318288
Default - `False`.
319-
:param wall_clock: if True, the callback is expected to accept timestamp only.
320-
Default - `False`.
321289
:return: a new Stream derived from the current one
322290
"""
323-
return self._add(TransformFunction(func, expand=expand, wall_clock=wall_clock)) # type: ignore[call-overload]
291+
return self._add(TransformFunction(func, expand=expand)) # type: ignore[call-overload]
324292

325293
def merge(self, other: "Stream") -> "Stream":
326294
"""

0 commit comments

Comments
 (0)