Skip to content

Commit ba2c702

Browse files
Difference between shutdown(wait=True) and shutdown(wait=False) (#721)
* Difference between shutdown(wait=True) and shutdown(wait=False) When `shutdown(wait=True)` is called - the default - then the executor waits until all future objects completed. In contrast when `shutdown(wait=True)` is called the future objects are cancelled on the queuing system. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 15f648f commit ba2c702

File tree

6 files changed

+13
-51
lines changed

6 files changed

+13
-51
lines changed

executorlib/executor/flux.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
check_plot_dependency_graph,
1010
check_pmi,
1111
check_refresh_rate,
12-
check_terminate_tasks_on_shutdown,
1312
validate_number_of_cores,
1413
)
1514
from executorlib.task_scheduler.interactive.blockallocation import (
@@ -64,7 +63,6 @@ class FluxJobExecutor(BaseExecutor):
6463
debugging purposes and to get an overview of the specified dependencies.
6564
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6665
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
67-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
6866
6967
Examples:
7068
```
@@ -105,7 +103,6 @@ def __init__(
105103
plot_dependency_graph: bool = False,
106104
plot_dependency_graph_filename: Optional[str] = None,
107105
log_obj_size: bool = False,
108-
terminate_tasks_on_shutdown: bool = True,
109106
):
110107
"""
111108
The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager
@@ -151,7 +148,6 @@ def __init__(
151148
debugging purposes and to get an overview of the specified dependencies.
152149
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
153150
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
154-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
155151
156152
"""
157153
default_resource_dict: dict = {
@@ -167,9 +163,6 @@ def __init__(
167163
resource_dict.update(
168164
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
169165
)
170-
check_terminate_tasks_on_shutdown(
171-
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown
172-
)
173166
if not disable_dependencies:
174167
super().__init__(
175168
executor=DependencyTaskScheduler(
@@ -255,7 +248,6 @@ class FluxClusterExecutor(BaseExecutor):
255248
debugging purposes and to get an overview of the specified dependencies.
256249
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
257250
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
258-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
259251
260252
Examples:
261253
```
@@ -336,7 +328,6 @@ def __init__(
336328
debugging purposes and to get an overview of the specified dependencies.
337329
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
338330
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
339-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
340331
341332
"""
342333
default_resource_dict: dict = {
@@ -376,7 +367,6 @@ def __init__(
376367
block_allocation=block_allocation,
377368
init_function=init_function,
378369
disable_dependencies=disable_dependencies,
379-
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown,
380370
)
381371
)
382372
else:

executorlib/executor/slurm.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
check_log_obj_size,
77
check_plot_dependency_graph,
88
check_refresh_rate,
9-
check_terminate_tasks_on_shutdown,
109
validate_number_of_cores,
1110
)
1211
from executorlib.task_scheduler.interactive.blockallocation import (
@@ -61,7 +60,6 @@ class SlurmClusterExecutor(BaseExecutor):
6160
debugging purposes and to get an overview of the specified dependencies.
6261
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
6362
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
64-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
6563
6664
Examples:
6765
```
@@ -142,7 +140,6 @@ def __init__(
142140
debugging purposes and to get an overview of the specified dependencies.
143141
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
144142
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
145-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
146143
147144
"""
148145
default_resource_dict: dict = {
@@ -182,7 +179,6 @@ def __init__(
182179
block_allocation=block_allocation,
183180
init_function=init_function,
184181
disable_dependencies=disable_dependencies,
185-
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown,
186182
)
187183
)
188184
else:
@@ -249,7 +245,6 @@ class SlurmJobExecutor(BaseExecutor):
249245
debugging purposes and to get an overview of the specified dependencies.
250246
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
251247
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
252-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
253248
254249
Examples:
255250
```
@@ -332,7 +327,6 @@ def __init__(
332327
debugging purposes and to get an overview of the specified dependencies.
333328
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
334329
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
335-
terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default.
336330
337331
"""
338332
default_resource_dict: dict = {
@@ -348,9 +342,6 @@ def __init__(
348342
resource_dict.update(
349343
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
350344
)
351-
check_terminate_tasks_on_shutdown(
352-
terminate_tasks_on_shutdown=terminate_tasks_on_shutdown
353-
)
354345
if not disable_dependencies:
355346
super().__init__(
356347
executor=DependencyTaskScheduler(

executorlib/standalone/inputcheck.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -212,15 +212,3 @@ def check_log_obj_size(log_obj_size: bool) -> None:
212212
"log_obj_size is not supported for the executorlib.SlurmClusterExecutor and executorlib.FluxClusterExecutor."
213213
"Please use log_obj_size=False instead of log_obj_size=True."
214214
)
215-
216-
217-
def check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown: bool) -> None:
218-
"""
219-
Check if terminate_tasks_on_shutdown is False and raise a ValueError if it is.
220-
"""
221-
if not terminate_tasks_on_shutdown:
222-
raise ValueError(
223-
"terminate_tasks_on_shutdown is not supported for the executorlib.SingleNodeExecutor, "
224-
"executorlib.SlurmJobExecutor and executorlib.FluxJobExecutor."
225-
"Please use terminate_tasks_on_shutdown=True instead of terminate_tasks_on_shutdown=False."
226-
)

executorlib/task_scheduler/file/shared.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,17 @@ def execute_tasks_h5(
8787
with contextlib.suppress(queue.Empty):
8888
task_dict = future_queue.get_nowait()
8989
if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]:
90-
while len(memory_dict) > 0:
91-
memory_dict = {
92-
key: _check_task_output(
93-
task_key=key,
94-
future_obj=value,
95-
cache_directory=cache_dir_dict[key],
96-
)
97-
for key, value in memory_dict.items()
98-
if not value.done()
99-
}
90+
if task_dict["wait"]:
91+
while len(memory_dict) > 0:
92+
memory_dict = {
93+
key: _check_task_output(
94+
task_key=key,
95+
future_obj=value,
96+
cache_directory=cache_dir_dict[key],
97+
)
98+
for key, value in memory_dict.items()
99+
if not value.done()
100+
}
100101
if (
101102
terminate_function is not None
102103
and terminate_function == terminate_subprocess

executorlib/task_scheduler/file/task_scheduler.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ def create_file_executor(
9595
init_function: Optional[Callable] = None,
9696
disable_dependencies: bool = False,
9797
execute_function: Callable = execute_with_pysqa,
98-
terminate_tasks_on_shutdown: bool = True,
9998
):
10099
if block_allocation:
101100
raise ValueError(
@@ -113,12 +112,10 @@ def create_file_executor(
113112
check_executor(executor=flux_executor)
114113
check_nested_flux_executor(nested_flux_executor=flux_executor_nesting)
115114
check_flux_log_files(flux_log_files=flux_log_files)
116-
if terminate_tasks_on_shutdown and execute_function != execute_in_subprocess:
115+
if execute_function != execute_in_subprocess:
117116
terminate_function = terminate_with_pysqa # type: ignore
118-
elif terminate_tasks_on_shutdown and execute_function == execute_in_subprocess:
119-
terminate_function = terminate_subprocess # type: ignore
120117
else:
121-
terminate_function = None # type: ignore
118+
terminate_function = terminate_subprocess # type: ignore
122119
return FileTaskScheduler(
123120
resource_dict=resource_dict,
124121
pysqa_config_directory=pysqa_config_directory,

tests/test_standalone_inputcheck.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
check_hostname_localhost,
1919
check_pysqa_config_directory,
2020
check_file_exists,
21-
check_terminate_tasks_on_shutdown,
2221
check_log_obj_size,
2322
validate_number_of_cores,
2423
)
@@ -125,7 +124,3 @@ def test_validate_number_of_cores(self):
125124
def test_check_log_obj_size(self):
126125
with self.assertRaises(ValueError):
127126
check_log_obj_size(log_obj_size=True)
128-
129-
def test_terminate_tasks_on_shutdown(self):
130-
with self.assertRaises(ValueError):
131-
check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown=False)

0 commit comments

Comments
 (0)