diff --git a/.github/workflows/pipeline.yml b/.github/workflows/pipeline.yml index 5b1e02a7..c51853d0 100644 --- a/.github/workflows/pipeline.yml +++ b/.github/workflows/pipeline.yml @@ -402,7 +402,7 @@ jobs: run: echo -e "channels:\n - conda-forge\n" > .condarc - uses: conda-incubator/setup-miniconda@v3 with: - python-version: '3.9' + python-version: '3.10' miniforge-version: latest condarc-file: .condarc environment-file: .ci_support/environment-old.yml diff --git a/executorlib/backend/cache_parallel.py b/executorlib/backend/cache_parallel.py index 9b1d25d8..49ad6d6e 100644 --- a/executorlib/backend/cache_parallel.py +++ b/executorlib/backend/cache_parallel.py @@ -4,6 +4,7 @@ import cloudpickle +from executorlib.standalone.error import backend_write_error_file from executorlib.task_scheduler.file.backend import ( backend_load_file, backend_write_file, @@ -53,6 +54,10 @@ def main() -> None: output={"error": error}, runtime=time.time() - time_start, ) + backend_write_error_file( + error=error, + apply_dict=apply_dict, + ) else: if mpi_rank_zero: backend_write_file( diff --git a/executorlib/backend/interactive_parallel.py b/executorlib/backend/interactive_parallel.py index 5ae2c320..3d5aadcc 100644 --- a/executorlib/backend/interactive_parallel.py +++ b/executorlib/backend/interactive_parallel.py @@ -6,6 +6,7 @@ import cloudpickle import zmq +from executorlib.standalone.error import backend_write_error_file from executorlib.standalone.interactive.backend import call_funct, parse_arguments from executorlib.standalone.interactive.communication import ( interface_connect, @@ -82,6 +83,10 @@ def main() -> None: socket=socket, result_dict={"error": error}, ) + backend_write_error_file( + error=error, + apply_dict=input_dict, + ) else: # Send output if mpi_rank_zero: diff --git a/executorlib/backend/interactive_serial.py b/executorlib/backend/interactive_serial.py index 34fb2288..8f9088cd 100644 --- a/executorlib/backend/interactive_serial.py +++ b/executorlib/backend/interactive_serial.py @@ -2,6 +2,7 @@ from os.path import abspath from typing import Optional +from executorlib.standalone.error import backend_write_error_file from executorlib.standalone.interactive.backend import call_funct, parse_arguments from executorlib.standalone.interactive.communication import ( interface_connect, @@ -17,6 +18,7 @@ def main(argument_lst: Optional[list[str]] = None): Args: argument_lst (Optional[List[str]]): List of command line arguments. If None, sys.argv will be used. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails Returns: None @@ -58,6 +60,10 @@ def main(argument_lst: Optional[list[str]] = None): socket=socket, result_dict={"error": error}, ) + backend_write_error_file( + error=error, + apply_dict=input_dict, + ) else: # Send output interface_send(socket=socket, result_dict={"result": output}) diff --git a/executorlib/executor/flux.py b/executorlib/executor/flux.py index 145ccc73..4abfd33a 100644 --- a/executorlib/executor/flux.py +++ b/executorlib/executor/flux.py @@ -63,6 +63,7 @@ class FluxJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails Examples: ``` @@ -103,6 +104,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + write_error_file: bool = False, ): """ The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager @@ -148,6 +150,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ default_resource_dict: dict = { @@ -157,6 +160,7 @@ def __init__( "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], + "write_error_file": write_error_file, } if resource_dict is None: resource_dict = {} @@ -248,6 +252,7 @@ class FluxClusterExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails Examples: ``` @@ -285,6 +290,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + write_error_file: bool = False, ): """ The executorlib.FluxClusterExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -327,6 +333,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ default_resource_dict: dict = { @@ -336,6 +343,7 @@ def __init__( "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], + "write_error_file": write_error_file, } if resource_dict is None: resource_dict = {} diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 30037505..018aa1fc 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -57,6 +57,7 @@ class SingleNodeExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails Examples: ``` @@ -93,6 +94,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + write_error_file: bool = False, ): """ The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -134,6 +136,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ default_resource_dict: dict = { @@ -143,6 +146,7 @@ def __init__( "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], + "write_error_file": write_error_file, } if resource_dict is None: resource_dict = {} @@ -220,6 +224,7 @@ class TestClusterExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails Examples: ``` @@ -256,6 +261,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + write_error_file: bool = False, ): """ The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the @@ -291,6 +297,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ default_resource_dict: dict = { @@ -299,6 +306,7 @@ def __init__( "gpus_per_core": 0, "cwd": None, "openmpi_oversubscribe": False, + "write_error_file": write_error_file, } if resource_dict is None: resource_dict = {} diff --git a/executorlib/executor/slurm.py b/executorlib/executor/slurm.py index bf3e48c7..39e44ca3 100644 --- a/executorlib/executor/slurm.py +++ b/executorlib/executor/slurm.py @@ -60,6 +60,7 @@ class SlurmClusterExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails Examples: ``` @@ -97,6 +98,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + write_error_file: bool = False, ): """ The executorlib.SlurmClusterExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -139,6 +141,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ default_resource_dict: dict = { @@ -148,6 +151,7 @@ def __init__( "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], + "write_error_file": write_error_file, } if resource_dict is None: resource_dict = {} @@ -244,6 +248,7 @@ class SlurmJobExecutor(BaseExecutor): debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails Examples: ``` @@ -280,6 +285,7 @@ def __init__( plot_dependency_graph: bool = False, plot_dependency_graph_filename: Optional[str] = None, log_obj_size: bool = False, + write_error_file: bool = False, ): """ The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload @@ -325,6 +331,7 @@ def __init__( debugging purposes and to get an overview of the specified dependencies. plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ default_resource_dict: dict = { @@ -334,6 +341,7 @@ def __init__( "cwd": None, "openmpi_oversubscribe": False, "slurm_cmd_args": [], + "write_error_file": write_error_file, } if resource_dict is None: resource_dict = {} diff --git a/executorlib/standalone/cache.py b/executorlib/standalone/cache.py index c3d89483..ec0ddcd2 100644 --- a/executorlib/standalone/cache.py +++ b/executorlib/standalone/cache.py @@ -10,6 +10,7 @@ "error": "error", "runtime": "runtime", "queue_id": "queue_id", + "write_error_file": "write_error_file", } diff --git a/executorlib/standalone/error.py b/executorlib/standalone/error.py new file mode 100644 index 00000000..2dd31ca7 --- /dev/null +++ b/executorlib/standalone/error.py @@ -0,0 +1,20 @@ +import traceback + + +def backend_write_error_file(error: Exception, apply_dict: dict) -> None: + """ + Write an error to a file if specified in the apply_dict. + + Args: + error (Exception): The error to be written. + apply_dict (dict): Dictionary containing additional parameters. + + Returns: + None + """ + if apply_dict.get("write_error_file", False): + with open(apply_dict.get("error_file_name", "error.out"), "a") as f: + f.write("function: " + str(apply_dict["fn"]) + "\n") + f.write("args: " + str(apply_dict["args"]) + "\n") + f.write("kwargs: " + str(apply_dict["kwargs"]) + "\n") + traceback.print_exception(error, file=f) diff --git a/executorlib/task_scheduler/file/backend.py b/executorlib/task_scheduler/file/backend.py index 4ea27c17..cbe869cc 100644 --- a/executorlib/task_scheduler/file/backend.py +++ b/executorlib/task_scheduler/file/backend.py @@ -2,6 +2,7 @@ import time from typing import Any +from executorlib.standalone.error import backend_write_error_file from executorlib.task_scheduler.file.hdf import dump, load from executorlib.task_scheduler.file.shared import FutureItem @@ -77,6 +78,10 @@ def backend_execute_task_in_file(file_name: str) -> None: } except Exception as error: result = {"error": error} + backend_write_error_file( + error=error, + apply_dict=apply_dict, + ) backend_write_file( file_name=file_name, diff --git a/executorlib/task_scheduler/file/hdf.py b/executorlib/task_scheduler/file/hdf.py index d8cde507..c04287ee 100644 --- a/executorlib/task_scheduler/file/hdf.py +++ b/executorlib/task_scheduler/file/hdf.py @@ -52,6 +52,10 @@ def load(file_name: str) -> dict: data_dict["kwargs"] = cloudpickle.loads(np.void(hdf["/input_kwargs"])) else: data_dict["kwargs"] = {} + if "write_error_file" in hdf: + data_dict["write_error_file"] = cloudpickle.loads( + np.void(hdf["/write_error_file"]) + ) return data_dict diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index 8aaa76bc..a8d1603f 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -126,6 +126,7 @@ def execute_tasks_h5( ) cache_key = task_resource_dict.pop("cache_key", None) cache_directory = os.path.abspath(task_resource_dict.pop("cache_directory")) + write_error_file = task_resource_dict.pop("write_error_file", False) task_key, data_dict = serialize_funct_h5( fn=task_dict["fn"], fn_args=task_args, @@ -133,6 +134,7 @@ def execute_tasks_h5( resource_dict=task_resource_dict, cache_key=cache_key, ) + data_dict["write_error_file"] = write_error_file if task_key not in memory_dict: if os.path.join( cache_directory, task_key + "_o.h5" diff --git a/executorlib/task_scheduler/file/task_scheduler.py b/executorlib/task_scheduler/file/task_scheduler.py index fe719d8b..966f075d 100644 --- a/executorlib/task_scheduler/file/task_scheduler.py +++ b/executorlib/task_scheduler/file/task_scheduler.py @@ -36,6 +36,7 @@ def __init__( pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, disable_dependencies: bool = False, + write_error_file: bool = False, ): """ Initialize the FileExecutor. @@ -50,12 +51,14 @@ def __init__( pysqa_config_directory (str, optional): path to the pysqa config directory (only for pysqa based backend). backend (str, optional): name of the backend used to spawn tasks. disable_dependencies (boolean): Disable resolving future objects during the submission. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ super().__init__(max_cores=None) default_resource_dict = { "cores": 1, "cwd": None, "cache_directory": "executorlib_cache", + "write_error_file": write_error_file, } if resource_dict is None: resource_dict = {} @@ -95,6 +98,7 @@ def create_file_executor( init_function: Optional[Callable] = None, disable_dependencies: bool = False, execute_function: Callable = execute_with_pysqa, + write_error_file: bool = False, ): if block_allocation: raise ValueError( @@ -123,4 +127,5 @@ def create_file_executor( disable_dependencies=disable_dependencies, execute_function=execute_function, terminate_function=terminate_function, + write_error_file=write_error_file, ) diff --git a/executorlib/task_scheduler/interactive/shared.py b/executorlib/task_scheduler/interactive/shared.py index 7c27e043..e613eea7 100644 --- a/executorlib/task_scheduler/interactive/shared.py +++ b/executorlib/task_scheduler/interactive/shared.py @@ -26,6 +26,7 @@ def execute_tasks( cache_key: Optional[str] = None, queue_join_on_shutdown: bool = True, log_obj_size: bool = False, + write_error_file: bool = False, **kwargs, ) -> None: """ @@ -48,6 +49,7 @@ def execute_tasks( overwritten by setting the cache_key. queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True. log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + write_error_file (boolean): Enable writing error.out files when the computation of a Python function fails """ interface = interface_bootup( command_lst=_get_backend_path( @@ -70,6 +72,7 @@ def execute_tasks( future_queue.join() break elif "fn" in task_dict and "future" in task_dict: + task_dict["write_error_file"] = write_error_file if cache_directory is None: _execute_task_without_cache( interface=interface, task_dict=task_dict, future_queue=future_queue diff --git a/pyproject.toml b/pyproject.toml index d51a8076..ee8dd253 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,6 @@ classifiers = [ "License :: OSI Approved :: BSD License", "Intended Audience :: Science/Research", "Operating System :: OS Independent", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index a9eb97a5..617956fd 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -83,11 +83,25 @@ def test_executor_working_directory(self): def test_executor_error(self): cwd = os.path.join(os.path.dirname(__file__), "executables") with FileTaskScheduler( - resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess, write_error_file=False, + ) as exe: + fs1 = exe.submit(get_error, a=1) + with self.assertRaises(ValueError): + fs1.result() + self.assertEqual(len(os.listdir(cwd)), 1) + + def test_executor_error_file(self): + cwd = os.path.join(os.path.dirname(__file__), "executables") + with FileTaskScheduler( + resource_dict={"cwd": cwd}, execute_function=execute_in_subprocess, write_error_file=True, ) as exe: fs1 = exe.submit(get_error, a=1) with self.assertRaises(ValueError): fs1.result() + working_directory_file_lst = os.listdir(cwd) + self.assertEqual(len(working_directory_file_lst), 2) + self.assertTrue("error.out" in working_directory_file_lst) + os.remove(os.path.join(cwd, "error.out")) def test_executor_function(self): fs1 = Future() diff --git a/tests/test_singlenodeexecutor_cache.py b/tests/test_singlenodeexecutor_cache.py index 531d369d..5d8a810f 100644 --- a/tests/test_singlenodeexecutor_cache.py +++ b/tests/test_singlenodeexecutor_cache.py @@ -51,12 +51,24 @@ def test_cache_key(self): def test_cache_error(self): cache_directory = os.path.abspath("cache_error") - with SingleNodeExecutor(cache_directory=cache_directory) as exe: + with SingleNodeExecutor(cache_directory=cache_directory, write_error_file=False) as exe: + self.assertTrue(exe) + cloudpickle_register(ind=1) + f = exe.submit(get_error, a=1) + with self.assertRaises(ValueError): + print(f.result()) + + def test_cache_error_file(self): + cache_directory = os.path.abspath("cache_error") + with SingleNodeExecutor(cache_directory=cache_directory, write_error_file=True) as exe: self.assertTrue(exe) cloudpickle_register(ind=1) f = exe.submit(get_error, a=1) with self.assertRaises(ValueError): print(f.result()) + error_out = "error.out" + self.assertTrue(os.path.exists(error_out)) + os.remove(error_out) def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) diff --git a/tests/test_singlenodeexecutor_dependencies.py b/tests/test_singlenodeexecutor_dependencies.py index cc8ea515..9568d09b 100644 --- a/tests/test_singlenodeexecutor_dependencies.py +++ b/tests/test_singlenodeexecutor_dependencies.py @@ -345,6 +345,7 @@ def setUp(self): 'spawner': MpiExecSpawner, 'max_cores': None, 'max_workers': None, + 'write_error_file': False } def test_info_disable_dependencies_true(self): diff --git a/tests/test_standalone_hdf.py b/tests/test_standalone_hdf.py index addcce55..1e46378b 100644 --- a/tests/test_standalone_hdf.py +++ b/tests/test_standalone_hdf.py @@ -75,10 +75,12 @@ def test_hdf_kwargs(self): "args": (), "kwargs": {"a": a, "b": b}, "queue_id": 123, + "write_error_file": True, }, ) data_dict = load(file_name=file_name) self.assertTrue("fn" in data_dict.keys()) + self.assertTrue(data_dict.get("write_error_file", False)) self.assertEqual(data_dict["args"], ()) self.assertEqual(data_dict["kwargs"], {"a": a, "b": b}) self.assertEqual(get_queue_id(file_name=file_name), 123)