-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-53751][SDP] Explicit Versioned Checkpoint Location #52487
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-53751][SDP] Explicit Versioned Checkpoint Location #52487
Conversation
val resolvedGraph = resolveGraph() | ||
if (context.fullRefreshTables.nonEmpty) { | ||
State.reset(resolvedGraph, context) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with explicit storage location for checkpoint, we shouldn't need to create the tables and obtain its path beforehand. resolvedGraph
should suffice.
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/DatasetManager.scala
Outdated
Show resolved
Hide resolved
f3533ff
to
60cfada
Compare
} | ||
|
||
override def afterEach(): Unit = { | ||
protected override def afterEach(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep consistency with signature of BeforeAndAfterEach
and fix an inheritance error when introducing StorageRootMixin
* The path to the temporary directory is available via the `storageRoot` variable. | ||
*/ | ||
trait StorageRootMixin extends BeforeAndAfterEach { self: Suite => | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extracting this out from PipelineTest
so spark connect pipeline test can also extend this
7ecc4dd
to
22a4c49
Compare
@sryza diff is a bit large but tried to include only the necessary changes for checkpoints |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one small comment about a comment. Otherwise, this looks great.
|
||
/** | ||
* Resets the checkpoint for the given flow by creating the next consecutive directory. Also | ||
* clears out batch append state if it exists. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this clears out batch append state, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah stale comment, let me remove
9413fd8
to
dcad414
Compare
What changes were proposed in this pull request?
Add a
storage
field in pipeline spec to allow users specify locations of metadata such as streaming checkpoints.Below is the structure of the directory, which offer supports for multi-flow and versioned directory where version number is incremented after a full refresh.
Why are the changes needed?
Currently, SDP stores streaming flow ckpts in the table path. This does not allow support for versioned checkpoints and does not work for sinks.
Does this PR introduce any user-facing change?
How was this patch tested?
New and existing tests
Was this patch authored or co-authored using generative AI tooling?
No