Skip to content

Difference between shutdown(wait=True) and shutdown(wait=False) #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions executorlib/executor/flux.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
check_plot_dependency_graph,
check_pmi,
check_refresh_rate,
check_terminate_tasks_on_shutdown,
validate_number_of_cores,
)
from executorlib.task_scheduler.interactive.blockallocation import (
Expand Down Expand Up @@ -64,7 +63,6 @@ class FluxJobExecutor(BaseExecutor):
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

Examples:
```
Expand Down Expand Up @@ -105,7 +103,6 @@ def __init__(
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
log_obj_size: bool = False,
terminate_tasks_on_shutdown: bool = True,
):
"""
The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager
Expand Down Expand Up @@ -151,7 +148,6 @@ def __init__(
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

"""
default_resource_dict: dict = {
Expand All @@ -167,9 +163,6 @@ def __init__(
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
check_terminate_tasks_on_shutdown(
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown
)
if not disable_dependencies:
super().__init__(
executor=DependencyTaskScheduler(
Expand Down Expand Up @@ -255,7 +248,6 @@ class FluxClusterExecutor(BaseExecutor):
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

Examples:
```
Expand Down Expand Up @@ -336,7 +328,6 @@ def __init__(
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -376,7 +367,6 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown,
)
)
else:
Expand Down
9 changes: 0 additions & 9 deletions executorlib/executor/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
check_log_obj_size,
check_plot_dependency_graph,
check_refresh_rate,
check_terminate_tasks_on_shutdown,
validate_number_of_cores,
)
from executorlib.task_scheduler.interactive.blockallocation import (
Expand Down Expand Up @@ -61,7 +60,6 @@ class SlurmClusterExecutor(BaseExecutor):
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

Examples:
```
Expand Down Expand Up @@ -142,7 +140,6 @@ def __init__(
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

"""
default_resource_dict: dict = {
Expand Down Expand Up @@ -182,7 +179,6 @@ def __init__(
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown,
)
)
else:
Expand Down Expand Up @@ -249,7 +245,6 @@ class SlurmJobExecutor(BaseExecutor):
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

Examples:
```
Expand Down Expand Up @@ -332,7 +327,6 @@ def __init__(
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.

"""
default_resource_dict: dict = {
Expand All @@ -348,9 +342,6 @@ def __init__(
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
check_terminate_tasks_on_shutdown(
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown
)
if not disable_dependencies:
super().__init__(
executor=DependencyTaskScheduler(
Expand Down
12 changes: 0 additions & 12 deletions executorlib/standalone/inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,3 @@ def check_log_obj_size(log_obj_size: bool) -> None:
"log_obj_size is not supported for the executorlib.SlurmClusterExecutor and executorlib.FluxClusterExecutor."
"Please use log_obj_size=False instead of log_obj_size=True."
)


def check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown: bool) -> None:
"""
Check if terminate_tasks_on_shutdown is False and raise a ValueError if it is.
"""
if not terminate_tasks_on_shutdown:
raise ValueError(
"terminate_tasks_on_shutdown is not supported for the executorlib.SingleNodeExecutor, "
"executorlib.SlurmJobExecutor and executorlib.FluxJobExecutor."
"Please use terminate_tasks_on_shutdown=True instead of terminate_tasks_on_shutdown=False."
)
21 changes: 11 additions & 10 deletions executorlib/task_scheduler/file/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,17 @@ def execute_tasks_h5(
with contextlib.suppress(queue.Empty):
task_dict = future_queue.get_nowait()
if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]:
while len(memory_dict) > 0:
memory_dict = {
key: _check_task_output(
task_key=key,
future_obj=value,
cache_directory=cache_dir_dict[key],
)
for key, value in memory_dict.items()
if not value.done()
}
if task_dict["wait"]:
while len(memory_dict) > 0:
memory_dict = {
key: _check_task_output(
task_key=key,
future_obj=value,
cache_directory=cache_dir_dict[key],
)
for key, value in memory_dict.items()
if not value.done()
}
if (
terminate_function is not None
and terminate_function == terminate_subprocess
Expand Down
7 changes: 2 additions & 5 deletions executorlib/task_scheduler/file/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ def create_file_executor(
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
execute_function: Callable = execute_with_pysqa,
terminate_tasks_on_shutdown: bool = True,
):
if block_allocation:
raise ValueError(
Expand All @@ -113,12 +112,10 @@ def create_file_executor(
check_executor(executor=flux_executor)
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
check_flux_log_files(flux_log_files=flux_log_files)
if terminate_tasks_on_shutdown and execute_function != execute_in_subprocess:
if execute_function != execute_in_subprocess:
terminate_function = terminate_with_pysqa # type: ignore
elif terminate_tasks_on_shutdown and execute_function == execute_in_subprocess:
terminate_function = terminate_subprocess # type: ignore
else:
terminate_function = None # type: ignore
terminate_function = terminate_subprocess # type: ignore
Comment on lines +115 to +118
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Good simplification! Note the breaking change.

The simplified logic for determining terminate_function is cleaner and more maintainable. The function selection now depends solely on the execute_function type rather than multiple conditional flags.

This removal of the terminate_tasks_on_shutdown parameter constitutes a breaking change for any external code that was using this parameter.


🏁 Script executed:

#!/bin/bash
# Search for any remaining references to terminate_tasks_on_shutdown parameter
rg -i "terminate_tasks_on_shutdown"

# Check for any imports or function calls that might still expect this parameter
ast-grep --pattern 'create_file_executor($$$, terminate_tasks_on_shutdown=$_, $$$)'

Length of output: 521


🏁 Script executed:

#!/bin/bash
# Locate create_file_executor definition
rg -n "def create_file_executor" -n executorlib/task_scheduler/file/task_scheduler.py

# Check if terminate_tasks_on_shutdown is still referenced in this file
rg -n "terminate_tasks_on_shutdown" -n executorlib/task_scheduler/file/task_scheduler.py

# Print lines around the function signature for context
sed -n '100,140p' executorlib/task_scheduler/file/task_scheduler.py

Length of output: 1549


Breaking Change: Remove terminate_tasks_on_shutdown References

The new create_file_executor no longer accepts or propagates a terminate_tasks_on_shutdown parameter, but several executors and tests still reference it. Please update or clean up these callers to match the simplified API:

• executorlib/executor/slurm.py

  • Remove the terminate_tasks_on_shutdown: bool = True parameter from the SlurmExecutor signature and any internal references.

• executorlib/executor/flux.py

  • Remove the terminate_tasks_on_shutdown: bool = True parameter from the FluxExecutor signature and any internal references.

• tests/test_fluxclusterexecutor.py

  • Drop the terminate_tasks_on_shutdown= arguments passed when instantiating executors.

Fix these to align with the new create_file_executor signature or reintroduce a no-op parameter if backward compatibility is required.

🤖 Prompt for AI Agents
In executorlib/task_scheduler/file/task_scheduler.py around lines 115 to 118,
the code and related executors still reference the obsolete
terminate_tasks_on_shutdown parameter which is no longer accepted by
create_file_executor. Remove the terminate_tasks_on_shutdown parameter from the
SlurmExecutor and FluxExecutor class signatures and their internal usage in
executorlib/executor/slurm.py and executorlib/executor/flux.py. Also, update
tests/test_fluxclusterexecutor.py to eliminate any terminate_tasks_on_shutdown
arguments when creating executor instances to align with the simplified API.

return FileTaskScheduler(
resource_dict=resource_dict,
pysqa_config_directory=pysqa_config_directory,
Expand Down
5 changes: 0 additions & 5 deletions tests/test_standalone_inputcheck.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
check_hostname_localhost,
check_pysqa_config_directory,
check_file_exists,
check_terminate_tasks_on_shutdown,
check_log_obj_size,
validate_number_of_cores,
)
Expand Down Expand Up @@ -125,7 +124,3 @@ def test_validate_number_of_cores(self):
def test_check_log_obj_size(self):
with self.assertRaises(ValueError):
check_log_obj_size(log_obj_size=True)

def test_terminate_tasks_on_shutdown(self):
with self.assertRaises(ValueError):
check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown=False)
Loading