Skip to content

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Sep 30, 2025

What changes were proposed in this pull request?

Add a storage field in pipeline spec to allow users specify locations of metadata such as streaming checkpoints.
Below is the structure of the directory, which offer supports for multi-flow and versioned directory where version number is incremented after a full refresh.

storage-root/
└── _checkpoints/ # checkpoint root
      ├── myst/
      │    ├── flow1/
      │    │    ├── 0/ # version 0
      │    │    │    ├── commits/
      │    │    │    ├── offsets/
      │    │    │    └── sources/
      │    │    └── 1/ # version 1
      │    │
      │    └── flow2/
      │         ├── 0/
      │         │    ├── commits/
      │         │    ├── offsets/
      │         │    └── sources/
      │         └── 1/
      │
      └── mysink/
            └── flowA/
                 ├── 0/
                 │    ├── commits/
                 │    ├── offsets/
                 │    └── sources/
                 └── 1/

Why are the changes needed?

Currently, SDP stores streaming flow ckpts in the table path. This does not allow support for versioned checkpoints and does not work for sinks.

Does this PR introduce any user-facing change?

How was this patch tested?

New and existing tests

Was this patch authored or co-authored using generative AI tooling?

No

val resolvedGraph = resolveGraph()
if (context.fullRefreshTables.nonEmpty) {
State.reset(resolvedGraph, context)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with explicit storage location for checkpoint, we shouldn't need to create the tables and obtain its path beforehand. resolvedGraph should suffice.

@github-actions github-actions bot added the PYTHON label Oct 6, 2025
@JiaqiWang18 JiaqiWang18 changed the title [Prototype][SPARK-53751][SDP] Explicit Checkpoint Location [SPARK-53751][SDP] Explicit Versioned Checkpoint Location Oct 6, 2025
@JiaqiWang18 JiaqiWang18 force-pushed the SPARK-53751-explicit-versioned-multiflow-checkpoint branch from f3533ff to 60cfada Compare October 6, 2025 23:44
@JiaqiWang18 JiaqiWang18 marked this pull request as ready for review October 6, 2025 23:56
}

override def afterEach(): Unit = {
protected override def afterEach(): Unit = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep consistency with signature of BeforeAndAfterEach and fix an inheritance error when introducing StorageRootMixin

* The path to the temporary directory is available via the `storageRoot` variable.
*/
trait StorageRootMixin extends BeforeAndAfterEach { self: Suite =>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extracting this out from PipelineTest so spark connect pipeline test can also extend this

@JiaqiWang18 JiaqiWang18 force-pushed the SPARK-53751-explicit-versioned-multiflow-checkpoint branch 4 times, most recently from 7ecc4dd to 22a4c49 Compare October 7, 2025 03:32
@JiaqiWang18
Copy link
Contributor Author

@sryza diff is a bit large but tried to include only the necessary changes for checkpoints

Copy link
Contributor

@sryza sryza left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one small comment about a comment. Otherwise, this looks great.


/**
* Resets the checkpoint for the given flow by creating the next consecutive directory. Also
* clears out batch append state if it exists.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this clears out batch append state, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah stale comment, let me remove

@JiaqiWang18 JiaqiWang18 force-pushed the SPARK-53751-explicit-versioned-multiflow-checkpoint branch from 9413fd8 to dcad414 Compare October 8, 2025 17:06
@sryza sryza closed this in 9a1c742 Oct 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants