Skip to content

Move PeriodicStream into periodicsequence. #35412

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jun 27, 2025

Conversation

shunping
Copy link
Collaborator

A follow-up PR of #35300 to address the concern at #35300 (comment)

@shunping shunping self-assigned this Jun 24, 2025
@shunping shunping requested a review from damccorm June 24, 2025 03:13
@shunping shunping marked this pull request as ready for review June 24, 2025 03:13
Copy link
Contributor

Assigning reviewers:

R: @jrmccluskey for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I proposed some changes that I think we should do regardless, but if we do them it also makes it really clean to consolidate under PeriodicImpulse (we'd just be adding the data parameter)


The output mode of the DoFn is based on the input `data`:

- **None**: If `data` is None (by default), the output element will be the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we still output a TimestampedValue here for consistency? Then we could remove

| 'MapToTimestamped' >> beam.Map(lambda tt: TimestampedValue(tt, tt)))

from PeriodicImpulse

Copy link
Collaborator Author

@shunping shunping Jun 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. But notice that this will introduce a breaking change on the pipeline DAG.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. You could get around this by moving most of this logic into the map transform instead, but I don't think this is necessary if you don't think it is cleaner (I won't block on this).

Copy link
Collaborator Author

@shunping shunping Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, the logic of using pre-timestamped values has to be put inside ImpulseSeqGenDoFn to ensure the water estimate is consistent with the event times. I updated the code to run this map step when data is not specified just to keep the DAG compatible, but I am open to any other suggestions.

current_watermark = watermark_estimator.current_watermark()
if current_watermark is None or output_ts > current_watermark:
# ensure watermark is monotonic
watermark_estimator.set_watermark(output_ts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this watermark estimation won't work if the data has provided event times.

For example, imagine we have the following data:

(data=foo, event_time=10)
(data=foo, event_time=15)
(data=foo, event_time=1)

when we see the first element, we'd increment the watermark to 10. But this would immediately mean that the 3rd element is late.

The problem is even more severe when you consider repeating data. If you have repeating data, the only valid watermark you can advance to is min(all_event_times).

I'd propose the following changes:

  1. If you repeat data, that data cannot have event times associated with it (otherwise watermark estimation is basically impossible). This would be a validation step at transform construction.
  2. If you don't repeat data, but there are associated watermarks, when emitting data[i], set the watermark to min(data[i+1:].map(lambda d: d. event_time)). That will ensure a valid watermark.

Note that this would allow you to move the logic into the map transform if you want to, resolving the problems described in https://github.com/apache/beam/pull/35412/files#r2164781118

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider this is a feature for users to specify late data.

As mentioned in another thread, I think the event time setting logic has to be in the DoFn so that event time in the elements is consistent with the watermark estimate.


The output mode of the DoFn is based on the input `data`:

- **None**: If `data` is None (by default), the output element will be the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point. You could get around this by moving most of this logic into the map transform instead, but I don't think this is necessary if you don't think it is cleaner (I won't block on this).

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - this generally lgtm, remaining comments are minor. Thanks for being patient/flexible here

'''
:param start_timestamp: Timestamp for first element.
:param stop_timestamp: Timestamp after which no elements will be output.
:param fire_interval: Interval in seconds at which to output elements.
:param apply_windowing: Whether each element should be assigned to
individual window. If false, all elements will reside in global window.
:param data: The sequence of elements to emit into the PCollection.
The elements can be raw values or pre-timestamped tuples in the format
`(apache_beam.utils.timestamp.Timestamp, value)`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you describe the watermark semantics of pre-timestamped data here please?

Comment on lines 267 to 268
start_timestamp: Union[Timestamp, float] = Timestamp.now(),
stop_timestamp: Union[Timestamp, float] = MAX_TIMESTAMP,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with allowing floats here, but could you explain what float means in the pydoc? Alternatively, we can just keep Timestamp as the type.

Copy link
Collaborator Author

@shunping shunping Jun 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see ImpulseSeqGenDoFn accept float as its input elements

start, _, interval = element
if isinstance(start, Timestamp):

I don't have a strong preference on this, so let's keep Timestamp as the type then.

@shunping shunping force-pushed the periodic-stream-2 branch from 0deba91 to fd153a5 Compare June 27, 2025 05:08
Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this LGTM

@shunping
Copy link
Collaborator Author

shunping commented Jun 27, 2025

Thanks, this LGTM

Thanks! Could you take a final look on the docstrings change I made?

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks - this looks good to me

@shunping
Copy link
Collaborator Author

The lint problem (https://github.com/apache/beam/actions/runs/15926733133/job/44925603939?pr=35412) seems to related to vertex ai change. @claudevdm could you take a look?

apache_beam/ml/transforms/embeddings/vertex_ai.py:129: error: Argument 1 to "get_embeddings" of "MultiModalEmbeddingModel" has incompatible type "Sequence[TextEmbeddingInput]"; expected "Optional[Image]"  [arg-type]
apache_beam/ml/transforms/embeddings/vertex_ai.py:150: error: List comprehension has incompatible type List[TextEmbeddingInput]; expected List[str]  [misc]
apache_beam/ml/rag/embeddings/vertex_ai.py:40: error: Incompatible types in assignment (expression has type "None", variable has type Module)  [assignment]

@damccorm
Copy link
Contributor

The lint problem (https://github.com/apache/beam/actions/runs/15926733133/job/44925603939?pr=35412) seems to related to vertex ai change. @claudevdm could you take a look?

apache_beam/ml/transforms/embeddings/vertex_ai.py:129: error: Argument 1 to "get_embeddings" of "MultiModalEmbeddingModel" has incompatible type "Sequence[TextEmbeddingInput]"; expected "Optional[Image]"  [arg-type]
apache_beam/ml/transforms/embeddings/vertex_ai.py:150: error: List comprehension has incompatible type List[TextEmbeddingInput]; expected List[str]  [misc]
apache_beam/ml/rag/embeddings/vertex_ai.py:40: error: Incompatible types in assignment (expression has type "None", variable has type Module)  [assignment]

I have #35463 - it should be safe to ignore for this PR though

@shunping
Copy link
Collaborator Author

The lint problem (https://github.com/apache/beam/actions/runs/15926733133/job/44925603939?pr=35412) seems to related to vertex ai change. @claudevdm could you take a look?

apache_beam/ml/transforms/embeddings/vertex_ai.py:129: error: Argument 1 to "get_embeddings" of "MultiModalEmbeddingModel" has incompatible type "Sequence[TextEmbeddingInput]"; expected "Optional[Image]"  [arg-type]
apache_beam/ml/transforms/embeddings/vertex_ai.py:150: error: List comprehension has incompatible type List[TextEmbeddingInput]; expected List[str]  [misc]
apache_beam/ml/rag/embeddings/vertex_ai.py:40: error: Incompatible types in assignment (expression has type "None", variable has type Module)  [assignment]

I have #35463 - it should be safe to ignore for this PR though

Sounds good. Thanks!

@shunping shunping merged commit 9f431f7 into apache:master Jun 27, 2025
111 of 113 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants