From 1686125914dad53db2a6021423db3f1b23afb5d7 Mon Sep 17 00:00:00 2001 From: jan-janssen Date: Sun, 13 Jul 2025 12:04:59 +0200 Subject: [PATCH 1/2] Difference between shutdown(wait=True) and shutdown(wait=False) When `shutdown(wait=True)` is called - the default - then the executor waits until all future objects completed. In contrast when `shutdown(wait=True)` is called the future objects are cancelled on the queuing system. --- executorlib/executor/flux.py | 10 --------- executorlib/executor/slurm.py | 9 -------- executorlib/standalone/inputcheck.py | 14 +------------ executorlib/task_scheduler/file/shared.py | 21 ++++++++++--------- .../task_scheduler/file/task_scheduler.py | 7 ++----- tests/test_standalone_inputcheck.py | 5 ----- 6 files changed, 14 insertions(+), 52 deletions(-) diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index 9291fa40..a41f0f47 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -9,7 +9,6 @@ check_plot_dependency_graph, check_pmi, check_refresh_rate, - check_terminate_tasks_on_shutdown, validate_number_of_cores, ) from executorlib.task_scheduler.interactive.blockallocation import ( @@ -64,7 +63,6 @@ class FluxJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. Examples: ``` @@ -105,7 +103,6 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, - terminate_tasks_on_shutdown: bool = True, ): """ The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager @@ -151,7 +148,6 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. """ default_resource_dict: dict = { @@ -167,9 +163,6 @@ def __init__( resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) - check_terminate_tasks_on_shutdown( - terminate_tasks_on_shutdown=terminate_tasks_on_shutdown - ) if not disable_dependencies: super().__init__( executor=DependencyTaskScheduler( @@ -255,7 +248,6 @@ class FluxClusterExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. Examples: ``` @@ -336,7 +328,6 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. """ default_resource_dict: dict = { @@ -376,7 +367,6 @@ def __init__( block_allocation=block_allocation, init_function=init_function, disable_dependencies=disable_dependencies, - terminate_tasks_on_shutdown=terminate_tasks_on_shutdown, ) ) else: diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index 3bab3c19..6ceac729 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -6,7 +6,6 @@ check_log_obj_size, check_plot_dependency_graph, check_refresh_rate, - check_terminate_tasks_on_shutdown, validate_number_of_cores, ) from executorlib.task_scheduler.interactive.blockallocation import ( @@ -61,7 +60,6 @@ class SlurmClusterExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. Examples: ``` @@ -142,7 +140,6 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. """ default_resource_dict: dict = { @@ -182,7 +179,6 @@ def __init__( block_allocation=block_allocation, init_function=init_function, disable_dependencies=disable_dependencies, - terminate_tasks_on_shutdown=terminate_tasks_on_shutdown, ) ) else: @@ -249,7 +245,6 @@ class SlurmJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. Examples: ``` @@ -332,7 +327,6 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. - terminate_tasks_on_shutdown (bool): Shutdown all tasks when the Executor is shutdown, this is the default. """ default_resource_dict: dict = { @@ -348,9 +342,6 @@ def __init__( resource_dict.update( {k: v for k, v in default_resource_dict.items() if k not in resource_dict} ) - check_terminate_tasks_on_shutdown( - terminate_tasks_on_shutdown=terminate_tasks_on_shutdown - ) if not disable_dependencies: super().__init__( executor=DependencyTaskScheduler( diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index e0d6cbe1..6e681f0e 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -211,16 +211,4 @@ def check_log_obj_size(log_obj_size: bool) -> None: raise ValueError( "log_obj_size is not supported for the executorlib.SlurmClusterExecutor and executorlib.FluxClusterExecutor." "Please use log_obj_size=False instead of log_obj_size=True." - ) - - -def check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown: bool) -> None: - """ - Check if terminate_tasks_on_shutdown is False and raise a ValueError if it is. - """ - if not terminate_tasks_on_shutdown: - raise ValueError( - "terminate_tasks_on_shutdown is not supported for the executorlib.SingleNodeExecutor, " - "executorlib.SlurmJobExecutor and executorlib.FluxJobExecutor." - "Please use terminate_tasks_on_shutdown=True instead of terminate_tasks_on_shutdown=False." - ) + ) \ No newline at end of file diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 42eeee61..f74ae1b1 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -87,16 +87,17 @@ def execute_tasks_h5( with contextlib.suppress(queue.Empty): task_dict = future_queue.get_nowait() if task_dict is not None and "shutdown" in task_dict and task_dict["shutdown"]: - while len(memory_dict) > 0: - memory_dict = { - key: _check_task_output( - task_key=key, - future_obj=value, - cache_directory=cache_dir_dict[key], - ) - for key, value in memory_dict.items() - if not value.done() - } + if task_dict["wait"]: + while len(memory_dict) > 0: + memory_dict = { + key: _check_task_output( + task_key=key, + future_obj=value, + cache_directory=cache_dir_dict[key], + ) + for key, value in memory_dict.items() + if not value.done() + } if ( terminate_function is not None and terminate_function == terminate_subprocess diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index 22cb3a48..fe719d8b 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -95,7 +95,6 @@ def create_file_executor( init_function: Optional[Callable] = None, disable_dependencies: bool = False, execute_function: Callable = execute_with_pysqa, - terminate_tasks_on_shutdown: bool = True, ): if block_allocation: raise ValueError( @@ -113,12 +112,10 @@ def create_file_executor( check_executor(executor=flux_executor) check_nested_flux_executor(nested_flux_executor=flux_executor_nesting) check_flux_log_files(flux_log_files=flux_log_files) - if terminate_tasks_on_shutdown and execute_function != execute_in_subprocess: + if execute_function != execute_in_subprocess: terminate_function = terminate_with_pysqa # type: ignore - elif terminate_tasks_on_shutdown and execute_function == execute_in_subprocess: - terminate_function = terminate_subprocess # type: ignore else: - terminate_function = None # type: ignore + terminate_function = terminate_subprocess # type: ignore return FileTaskScheduler( resource_dict=resource_dict, pysqa_config_directory=pysqa_config_directory, diff --git a/tests/test_standalone_inputcheck.py b/tests/test_standalone_inputcheck.py index 47fe956c..d1d74df1 100644 --- a/tests/test_standalone_inputcheck.py +++ b/tests/test_standalone_inputcheck.py @@ -18,7 +18,6 @@ check_hostname_localhost, check_pysqa_config_directory, check_file_exists, - check_terminate_tasks_on_shutdown, check_log_obj_size, validate_number_of_cores, ) @@ -125,7 +124,3 @@ def test_validate_number_of_cores(self): def test_check_log_obj_size(self): with self.assertRaises(ValueError): check_log_obj_size(log_obj_size=True) - - def test_terminate_tasks_on_shutdown(self): - with self.assertRaises(ValueError): - check_terminate_tasks_on_shutdown(terminate_tasks_on_shutdown=False) \ No newline at end of file From ccf6064d42585eae6ed05786a8a3a7237689108e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 13 Jul 2025 10:05:30 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- executorlib/standalone/inputcheck.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executorlib/standalone/inputcheck.py b/executorlib/standalone/inputcheck.py index 6e681f0e..2ebe6808 100644 --- a/executorlib/standalone/inputcheck.py +++ b/executorlib/standalone/inputcheck.py @@ -211,4 +211,4 @@ def check_log_obj_size(log_obj_size: bool) -> None: raise ValueError( "log_obj_size is not supported for the executorlib.SlurmClusterExecutor and executorlib.FluxClusterExecutor." "Please use log_obj_size=False instead of log_obj_size=True." - ) \ No newline at end of file + )