-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Move PeriodicStream into periodicsequence. #35412
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Assigning reviewers: R: @jrmccluskey for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I proposed some changes that I think we should do regardless, but if we do them it also makes it really clean to consolidate under PeriodicImpulse (we'd just be adding the data
parameter)
|
||
The output mode of the DoFn is based on the input `data`: | ||
|
||
- **None**: If `data` is None (by default), the output element will be the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we still output a TimestampedValue here for consistency? Then we could remove
| 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))
from PeriodicImpulse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. But notice that this will introduce a breaking change on the pipeline DAG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. You could get around this by moving most of this logic into the map
transform instead, but I don't think this is necessary if you don't think it is cleaner (I won't block on this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed offline, the logic of using pre-timestamped values has to be put inside ImpulseSeqGenDoFn
to ensure the water estimate is consistent with the event times. I updated the code to run this map step when data is not specified just to keep the DAG compatible, but I am open to any other suggestions.
current_watermark = watermark_estimator.current_watermark() | ||
if current_watermark is None or output_ts > current_watermark: | ||
# ensure watermark is monotonic | ||
watermark_estimator.set_watermark(output_ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this watermark estimation won't work if the data has provided event times.
For example, imagine we have the following data:
(data=foo, event_time=10)
(data=foo, event_time=15)
(data=foo, event_time=1)
when we see the first element, we'd increment the watermark to 10. But this would immediately mean that the 3rd element is late.
The problem is even more severe when you consider repeating data. If you have repeating data, the only valid watermark you can advance to is min(all_event_times)
.
I'd propose the following changes:
- If you repeat data, that data cannot have event times associated with it (otherwise watermark estimation is basically impossible). This would be a validation step at transform construction.
- If you don't repeat data, but there are associated watermarks, when emitting
data[i]
, set the watermark tomin(data[i+1:].map(lambda d: d. event_time))
. That will ensure a valid watermark.
Note that this would allow you to move the logic into the map
transform if you want to, resolving the problems described in https://github.com/apache/beam/pull/35412/files#r2164781118
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider this is a feature for users to specify late data.
As mentioned in another thread, I think the event time setting logic has to be in the DoFn so that event time in the elements is consistent with the watermark estimate.
|
||
The output mode of the DoFn is based on the input `data`: | ||
|
||
- **None**: If `data` is None (by default), the output element will be the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point. You could get around this by moving most of this logic into the map
transform instead, but I don't think this is necessary if you don't think it is cleaner (I won't block on this).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - this generally lgtm, remaining comments are minor. Thanks for being patient/flexible here
''' | ||
:param start_timestamp: Timestamp for first element. | ||
:param stop_timestamp: Timestamp after which no elements will be output. | ||
:param fire_interval: Interval in seconds at which to output elements. | ||
:param apply_windowing: Whether each element should be assigned to | ||
individual window. If false, all elements will reside in global window. | ||
:param data: The sequence of elements to emit into the PCollection. | ||
The elements can be raw values or pre-timestamped tuples in the format | ||
`(apache_beam.utils.timestamp.Timestamp, value)`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you describe the watermark semantics of pre-timestamped data here please?
start_timestamp: Union[Timestamp, float] = Timestamp.now(), | ||
stop_timestamp: Union[Timestamp, float] = MAX_TIMESTAMP, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with allowing floats here, but could you explain what float means in the pydoc? Alternatively, we can just keep Timestamp as the type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see ImpulseSeqGenDoFn
accept float as its input elements
beam/sdks/python/apache_beam/transforms/periodicsequence.py
Lines 109 to 111 in 95b28b1
start, _, interval = element | |
if isinstance(start, Timestamp): |
I don't have a strong preference on this, so let's keep Timestamp as the type then.
1d89229
to
da73f7c
Compare
da73f7c
to
0deba91
Compare
0deba91
to
fd153a5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this LGTM
Thanks! Could you take a final look on the docstrings change I made? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks - this looks good to me
The lint problem (https://github.com/apache/beam/actions/runs/15926733133/job/44925603939?pr=35412) seems to related to vertex ai change. @claudevdm could you take a look?
|
I have #35463 - it should be safe to ignore for this PR though |
Sounds good. Thanks! |
A follow-up PR of #35300 to address the concern at #35300 (comment)