Skip to content

Commit 09ae798

Browse files
jan-janssenpre-commit-ci[bot]liamhuber
authored
Move cleanup of previous jobs into execute function (#732)
* Difference between shutdown(wait=True) and shutdown(wait=False) When `shutdown(wait=True)` is called - the default - then the executor waits until all future objects completed. In contrast when `shutdown(wait=True)` is called the future objects are cancelled on the queuing system. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Move cleanup of previous jobs into execute function * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix type hints * fix test * move imports * check if the file exists * fix old tests * Update executorlib/task_scheduler/file/queue_spawner.py Co-authored-by: Liam Huber <liam.huber@gmail.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * extend test * fix tests --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Liam Huber <liam.huber@gmail.com>
1 parent ba2c702 commit 09ae798

File tree

6 files changed

+46
-23
lines changed

6 files changed

+46
-23
lines changed

executorlib/task_scheduler/file/hdf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ def get_queue_id(file_name: Optional[str]) -> Optional[int]:
101101
Returns:
102102
int: queuing system id from the execution of the python function
103103
"""
104-
if file_name is not None:
104+
if file_name is not None and os.path.exists(file_name):
105105
with h5py.File(file_name, "r") as hdf:
106106
if "queue_id" in hdf:
107107
return cloudpickle.loads(np.void(hdf["/queue_id"]))

executorlib/task_scheduler/file/queue_spawner.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,10 @@
1010

1111
def execute_with_pysqa(
1212
command: list,
13+
file_name: str,
14+
data_dict: dict,
1315
cache_directory: str,
1416
task_dependent_lst: Optional[list[int]] = None,
15-
file_name: Optional[str] = None,
1617
resource_dict: Optional[dict] = None,
1718
config_directory: Optional[str] = None,
1819
backend: Optional[str] = None,
@@ -22,9 +23,10 @@ def execute_with_pysqa(
2223
2324
Args:
2425
command (list): The command to be executed.
26+
file_name (str): Name of the HDF5 file which contains the Python function
27+
data_dict (dict): dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}}
2528
cache_directory (str): The directory to store the HDF5 files.
2629
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
27-
file_name (str): Name of the HDF5 file which contains the Python function
2830
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
2931
Example resource dictionary: {
3032
cwd: None,
@@ -37,13 +39,20 @@ def execute_with_pysqa(
3739
"""
3840
if task_dependent_lst is None:
3941
task_dependent_lst = []
40-
check_file_exists(file_name=file_name)
41-
queue_id = get_queue_id(file_name=file_name)
4242
qa = QueueAdapter(
4343
directory=config_directory,
4444
queue_type=backend,
4545
execute_command=_pysqa_execute_command,
4646
)
47+
queue_id = get_queue_id(file_name=file_name)
48+
if os.path.exists(file_name) and (
49+
queue_id is None or qa.get_status_of_job(process_id=queue_id) is None
50+
):
51+
os.remove(file_name)
52+
dump(file_name=file_name, data_dict=data_dict)
53+
elif not os.path.exists(file_name):
54+
dump(file_name=file_name, data_dict=data_dict)
55+
check_file_exists(file_name=file_name)
4756
if queue_id is None or qa.get_status_of_job(process_id=queue_id) is None:
4857
if resource_dict is None:
4958
resource_dict = {}

executorlib/task_scheduler/file/shared.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from executorlib.standalone.cache import get_cache_files
1010
from executorlib.standalone.command import get_command_path
1111
from executorlib.standalone.serialize import serialize_funct_h5
12-
from executorlib.task_scheduler.file.hdf import dump, get_output
12+
from executorlib.task_scheduler.file.hdf import get_output
1313
from executorlib.task_scheduler.file.subprocess_spawner import terminate_subprocess
1414

1515

@@ -138,9 +138,6 @@ def execute_tasks_h5(
138138
cache_directory, task_key + "_o.h5"
139139
) not in get_cache_files(cache_directory=cache_directory):
140140
file_name = os.path.join(cache_directory, task_key + "_i.h5")
141-
if os.path.exists(file_name):
142-
os.remove(file_name)
143-
dump(file_name=file_name, data_dict=data_dict)
144141
if not disable_dependencies:
145142
task_dependent_lst = [
146143
process_dict[k] for k in future_wait_key_lst
@@ -159,6 +156,7 @@ def execute_tasks_h5(
159156
cores=task_resource_dict["cores"],
160157
),
161158
file_name=file_name,
159+
data_dict=data_dict,
162160
task_dependent_lst=task_dependent_lst,
163161
resource_dict=task_resource_dict,
164162
config_directory=pysqa_config_directory,

executorlib/task_scheduler/file/subprocess_spawner.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,47 @@
1+
import os
12
import subprocess
23
import time
34
from typing import Optional
45

56
from executorlib.standalone.inputcheck import check_file_exists
7+
from executorlib.task_scheduler.file.hdf import dump
68

79

810
def execute_in_subprocess(
911
command: list,
12+
file_name: str,
13+
data_dict: dict,
14+
cache_directory: Optional[str] = None,
1015
task_dependent_lst: Optional[list] = None,
11-
file_name: Optional[str] = None,
1216
resource_dict: Optional[dict] = None,
1317
config_directory: Optional[str] = None,
1418
backend: Optional[str] = None,
15-
cache_directory: Optional[str] = None,
1619
) -> subprocess.Popen:
1720
"""
1821
Execute a command in a subprocess.
1922
2023
Args:
2124
command (list): The command to be executed.
22-
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
2325
file_name (str): Name of the HDF5 file which contains the Python function
26+
data_dict (dict): dictionary containing the python function to be executed {"fn": ..., "args": (), "kwargs": {}}
27+
cache_directory (str): The directory to store the HDF5 files.
28+
task_dependent_lst (list): A list of subprocesses that the current subprocess depends on. Defaults to [].
2429
resource_dict (dict): resource dictionary, which defines the resources used for the execution of the function.
2530
Example resource dictionary: {
2631
cwd: None,
2732
}
2833
config_directory (str, optional): path to the config directory.
2934
backend (str, optional): name of the backend used to spawn tasks.
30-
cache_directory (str): The directory to store the HDF5 files.
3135
3236
Returns:
3337
subprocess.Popen: The subprocess object.
3438
3539
"""
3640
if task_dependent_lst is None:
3741
task_dependent_lst = []
42+
if os.path.exists(file_name):
43+
os.remove(file_name)
44+
dump(file_name=file_name, data_dict=data_dict)
3845
check_file_exists(file_name=file_name)
3946
while len(task_dependent_lst) > 0:
4047
task_dependent_lst = [

tests/test_cache_fileexecutor_mpi.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import importlib.util
2-
import os
32
import shutil
43
import unittest
54

6-
from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess
7-
85

96
try:
107
from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler
8+
from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess
119

1210
skip_h5py_test = False
1311
except ImportError:

tests/test_cache_fileexecutor_serial.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@
55
import unittest
66
from threading import Thread
77

8-
from executorlib.task_scheduler.file.subprocess_spawner import (
9-
execute_in_subprocess,
10-
terminate_subprocess,
11-
)
12-
138
try:
9+
from executorlib.task_scheduler.file.subprocess_spawner import (
10+
execute_in_subprocess,
11+
terminate_subprocess,
12+
)
1413
from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler, create_file_executor
1514
from executorlib.task_scheduler.file.shared import execute_tasks_h5
1615

@@ -201,12 +200,24 @@ def test_executor_function_dependence_args(self):
201200
process.join()
202201

203202
def test_execute_in_subprocess_errors(self):
203+
file_name = os.path.abspath(os.path.join(__file__, "..", "executorlib_cache", "test.h5"))
204+
os.makedirs(os.path.dirname(file_name))
205+
with open(file_name, "w") as f:
206+
f.write("test")
204207
with self.assertRaises(ValueError):
205208
execute_in_subprocess(
206-
file_name=__file__, command=[], config_directory="test"
209+
file_name=file_name,
210+
data_dict={},
211+
command=[],
212+
config_directory="test",
207213
)
208214
with self.assertRaises(ValueError):
209-
execute_in_subprocess(file_name=__file__, command=[], backend="flux")
215+
execute_in_subprocess(
216+
file_name=file_name,
217+
data_dict={},
218+
command=[],
219+
backend="flux",
220+
)
210221

211222
def tearDown(self):
212223
shutil.rmtree("executorlib_cache", ignore_errors=True)

0 commit comments

Comments
 (0)