diff --git a/executorlib/task_scheduler/file/hdf.py b/executorlib/task_scheduler/file/hdf.py index 903d7f94..d8cde507 100644 --- a/executorlib/task_scheduler/file/hdf.py +++ b/executorlib/task_scheduler/file/hdf.py @@ -101,7 +101,7 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]: Returns: int: queuing system id from the execution of the python function """ - if file_name is not None: + if file_name is not None and os.path.exists(file_name): with h5py.File(file_name, "r") as hdf: if "queue_id" in hdf: return cloudpickle.loads(np.void(hdf["/queue_id"])) diff --git a/executorlib/task_scheduler/file/queue_spawner.py b/executorlib/task_scheduler/file/queue_spawner.py index 8d80bfc8..7060dd6a 100644 --- a/executorlib/task_scheduler/file/queue_spawner.py +++ b/executorlib/task_scheduler/file/queue_spawner.py @@ -10,9 +10,10 @@ def execute_with_pysqa( command: list, + file_name: str, + data_dict: dict, cache_directory: str, task_dependent_lst: Optional[list[int]] = None, - file_name: Optional[str] = None, resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, @@ -22,9 +23,10 @@ def execute_with_pysqa( Args: command (list): The command to be executed. + file_name (str): Name of the HDF5 file which contains the Python function + data_dict (dict): dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}} cache_directory (str): The directory to store the HDF5 files. task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. - file_name (str): Name of the HDF5 file which contains the Python function resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. Example resource dictionary: { cwd: None, @@ -37,13 +39,20 @@ def execute_with_pysqa( """ if task_dependent_lst is None: task_dependent_lst = [] - check_file_exists(file_name=file_name) - queue_id = get_queue_id(file_name=file_name) qa = QueueAdapter( directory=config_directory, queue_type=backend, execute_command=_pysqa_execute_command, ) + queue_id = get_queue_id(file_name=file_name) + if os.path.exists(file_name) and ( + queue_id is None or qa.get_status_of_job(process_id=queue_id) is None + ): + os.remove(file_name) + dump(file_name=file_name, data_dict=data_dict) + elif not os.path.exists(file_name): + dump(file_name=file_name, data_dict=data_dict) + check_file_exists(file_name=file_name) if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None: if resource_dict is None: resource_dict = {} diff --git a/executorlib/task_scheduler/file/shared.py b/executorlib/task_scheduler/file/shared.py index f74ae1b1..8aaa76bc 100644 --- a/executorlib/task_scheduler/file/shared.py +++ b/executorlib/task_scheduler/file/shared.py @@ -9,7 +9,7 @@ from executorlib.standalone.cache import get_cache_files from executorlib.standalone.command import get_command_path from executorlib.standalone.serialize import serialize_funct_h5 -from executorlib.task_scheduler.file.hdf import dump, get_output +from executorlib.task_scheduler.file.hdf import get_output from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess @@ -138,9 +138,6 @@ def execute_tasks_h5( cache_directory, task_key + "_o.h5" ) not in get_cache_files(cache_directory=cache_directory): file_name = os.path.join(cache_directory, task_key + "_i.h5") - if os.path.exists(file_name): - os.remove(file_name) - dump(file_name=file_name, data_dict=data_dict) if not disable_dependencies: task_dependent_lst = [ process_dict[k] for k in future_wait_key_lst @@ -159,6 +156,7 @@ def execute_tasks_h5( cores=task_resource_dict["cores"], ), file_name=file_name, + data_dict=data_dict, task_dependent_lst=task_dependent_lst, resource_dict=task_resource_dict, config_directory=pysqa_config_directory, diff --git a/executorlib/task_scheduler/file/subprocess_spawner.py b/executorlib/task_scheduler/file/subprocess_spawner.py index 49c63d0c..4be0b529 100644 --- a/executorlib/task_scheduler/file/subprocess_spawner.py +++ b/executorlib/task_scheduler/file/subprocess_spawner.py @@ -1,33 +1,37 @@ +import os import subprocess import time from typing import Optional from executorlib.standalone.inputcheck import check_file_exists +from executorlib.task_scheduler.file.hdf import dump def execute_in_subprocess( command: list, + file_name: str, + data_dict: dict, + cache_directory: Optional[str] = None, task_dependent_lst: Optional[list] = None, - file_name: Optional[str] = None, resource_dict: Optional[dict] = None, config_directory: Optional[str] = None, backend: Optional[str] = None, - cache_directory: Optional[str] = None, ) -> subprocess.Popen: """ Execute a command in a subprocess. Args: command (list): The command to be executed. - task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. file_name (str): Name of the HDF5 file which contains the Python function + data_dict (dict): dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}} + cache_directory (str): The directory to store the HDF5 files. + task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to []. resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function. Example resource dictionary: { cwd: None, } config_directory (str, optional): path to the config directory. backend (str, optional): name of the backend used to spawn tasks. - cache_directory (str): The directory to store the HDF5 files. Returns: subprocess.Popen: The subprocess object. @@ -35,6 +39,9 @@ def execute_in_subprocess( """ if task_dependent_lst is None: task_dependent_lst = [] + if os.path.exists(file_name): + os.remove(file_name) + dump(file_name=file_name, data_dict=data_dict) check_file_exists(file_name=file_name) while len(task_dependent_lst) > 0: task_dependent_lst = [ diff --git a/tests/test_cache_fileexecutor_mpi.py b/tests/test_cache_fileexecutor_mpi.py index 38f93a9c..27fe658c 100644 --- a/tests/test_cache_fileexecutor_mpi.py +++ b/tests/test_cache_fileexecutor_mpi.py @@ -1,13 +1,11 @@ import importlib.util -import os import shutil import unittest -from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess - try: from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler + from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess skip_h5py_test = False except ImportError: diff --git a/tests/test_cache_fileexecutor_serial.py b/tests/test_cache_fileexecutor_serial.py index eb62c166..a9eb97a5 100644 --- a/tests/test_cache_fileexecutor_serial.py +++ b/tests/test_cache_fileexecutor_serial.py @@ -5,12 +5,11 @@ import unittest from threading import Thread -from executorlib.task_scheduler.file.subprocess_spawner import ( - execute_in_subprocess, - terminate_subprocess, -) - try: + from executorlib.task_scheduler.file.subprocess_spawner import ( + execute_in_subprocess, + terminate_subprocess, + ) from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler, create_file_executor from executorlib.task_scheduler.file.shared import execute_tasks_h5 @@ -201,12 +200,24 @@ def test_executor_function_dependence_args(self): process.join() def test_execute_in_subprocess_errors(self): + file_name = os.path.abspath(os.path.join(__file__, "..", "executorlib_cache", "test.h5")) + os.makedirs(os.path.dirname(file_name)) + with open(file_name, "w") as f: + f.write("test") with self.assertRaises(ValueError): execute_in_subprocess( - file_name=__file__, command=[], config_directory="test" + file_name=file_name, + data_dict={}, + command=[], + config_directory="test", ) with self.assertRaises(ValueError): - execute_in_subprocess(file_name=__file__, command=[], backend="flux") + execute_in_subprocess( + file_name=file_name, + data_dict={}, + command=[], + backend="flux", + ) def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True)