Skip to content

Commit 51a33eb

Browse files
jan-janssenpre-commit-ci[bot]coderabbitai[bot]
authored
Add TestClusterExecutor to simplify debugging of SlurmClusterExecutor and FluxClusterExecutor (#714)
* create_file_executor() - rename backend variable * change default * Add option to set execute_function * Add TestClusterExecutor to simplify debugging of SlurmClusterExecutor and FluxClusterExecutor * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Add tests * Update executorlib/executor/single.py Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> * update docstring * more tests --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent b68acc2 commit 51a33eb

File tree

3 files changed

+266
-0
lines changed

3 files changed

+266
-0
lines changed

executorlib/api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
functionality is considered internal and might change during minor releases.
66
"""
77

8+
from executorlib.executor.single import TestClusterExecutor
89
from executorlib.standalone.command import get_command_path
910
from executorlib.standalone.interactive.communication import (
1011
SocketInterface,
@@ -19,6 +20,7 @@
1920
from executorlib.standalone.serialize import cloudpickle_register
2021

2122
__all__: list[str] = [
23+
"TestClusterExecutor",
2224
"cancel_items_in_queue",
2325
"cloudpickle_register",
2426
"get_command_path",

executorlib/executor/single.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,174 @@ def __init__(
184184
)
185185

186186

187+
class TestClusterExecutor(BaseExecutor):
188+
"""
189+
The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the
190+
SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use the
191+
SingleNodeExecutor.
192+
193+
Args:
194+
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
195+
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
196+
recommended, as computers have a limited number of compute cores.
197+
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
198+
max_cores (int): defines the number cores which can be used in parallel
199+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
200+
- cores (int): number of MPI cores to be used for each function call
201+
- threads_per_core (int): number of OpenMP threads to be used for each function call
202+
- gpus_per_core (int): number of GPUs per worker - defaults to 0
203+
- cwd (str/None): current working directory where the parallel python task is executed
204+
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
205+
context of an HPC cluster this essential to be able to communicate to an
206+
Executor running on a different compute node within the same allocation. And
207+
in principle any computer should be able to resolve that their own hostname
208+
points to the same address as localhost. Still MacOS >= 12 seems to disable
209+
this look up for security reasons. So on MacOS it is required to set this
210+
option to true
211+
block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
212+
requirements, executorlib supports block allocation. In this case all resources have
213+
to be defined on the executor, rather than during the submission of the individual
214+
function.
215+
init_function (None): optional function to preset arguments for functions which are submitted later
216+
disable_dependencies (boolean): Disable resolving future objects during the submission.
217+
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
218+
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
219+
debugging purposes and to get an overview of the specified dependencies.
220+
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
221+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
222+
223+
Examples:
224+
```
225+
>>> import numpy as np
226+
>>> from executorlib.api import TestClusterExecutor
227+
>>>
228+
>>> def calc(i, j, k):
229+
>>> from mpi4py import MPI
230+
>>> size = MPI.COMM_WORLD.Get_size()
231+
>>> rank = MPI.COMM_WORLD.Get_rank()
232+
>>> return np.array([i, j, k]), size, rank
233+
>>>
234+
>>> def init_k():
235+
>>> return {"k": 3}
236+
>>>
237+
>>> with TestClusterExecutor(max_workers=2, init_function=init_k) as p:
238+
>>> fs = p.submit(calc, 2, j=4)
239+
>>> print(fs.result())
240+
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
241+
```
242+
"""
243+
244+
def __init__(
245+
self,
246+
max_workers: Optional[int] = None,
247+
cache_directory: Optional[str] = None,
248+
max_cores: Optional[int] = None,
249+
resource_dict: Optional[dict] = None,
250+
hostname_localhost: Optional[bool] = None,
251+
block_allocation: bool = False,
252+
init_function: Optional[Callable] = None,
253+
disable_dependencies: bool = False,
254+
refresh_rate: float = 0.01,
255+
plot_dependency_graph: bool = False,
256+
plot_dependency_graph_filename: Optional[str] = None,
257+
log_obj_size: bool = False,
258+
):
259+
"""
260+
The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the
261+
SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use
262+
the SingleNodeExecutor.
263+
264+
Args:
265+
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
266+
number of cores which can be used in parallel - just like the max_cores parameter. Using
267+
max_cores is recommended, as computers have a limited number of compute cores.
268+
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
269+
max_cores (int): defines the number cores which can be used in parallel
270+
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
271+
- cores (int): number of MPI cores to be used for each function call
272+
- threads_per_core (int): number of OpenMP threads to be used for each function call
273+
- gpus_per_core (int): number of GPUs per worker - defaults to 0
274+
- cwd (str/None): current working directory where the parallel python task is executed
275+
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
276+
context of an HPC cluster this essential to be able to communicate to an
277+
Executor running on a different compute node within the same allocation. And
278+
in principle any computer should be able to resolve that their own hostname
279+
points to the same address as localhost. Still MacOS >= 12 seems to disable
280+
this look up for security reasons. So on MacOS it is required to set this
281+
option to true
282+
block_allocation (boolean): To accelerate the submission of a series of python functions with the same
283+
resource requirements, executorlib supports block allocation. In this case all
284+
resources have to be defined on the executor, rather than during the submission
285+
of the individual function.
286+
init_function (None): optional function to preset arguments for functions which are submitted later
287+
disable_dependencies (boolean): Disable resolving future objects during the submission.
288+
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
289+
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
290+
debugging purposes and to get an overview of the specified dependencies.
291+
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
292+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
293+
294+
"""
295+
default_resource_dict: dict = {
296+
"cores": 1,
297+
"threads_per_core": 1,
298+
"gpus_per_core": 0,
299+
"cwd": None,
300+
"openmpi_oversubscribe": False,
301+
}
302+
if resource_dict is None:
303+
resource_dict = {}
304+
resource_dict.update(
305+
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
306+
)
307+
if not plot_dependency_graph:
308+
from executorlib.task_scheduler.file.subprocess_spawner import (
309+
execute_in_subprocess,
310+
)
311+
from executorlib.task_scheduler.file.task_scheduler import (
312+
create_file_executor,
313+
)
314+
315+
super().__init__(
316+
executor=create_file_executor(
317+
max_workers=max_workers,
318+
backend=None,
319+
max_cores=max_cores,
320+
cache_directory=cache_directory,
321+
resource_dict=resource_dict,
322+
flux_executor=None,
323+
flux_executor_pmi_mode=None,
324+
flux_executor_nesting=False,
325+
flux_log_files=False,
326+
pysqa_config_directory=None,
327+
hostname_localhost=hostname_localhost,
328+
block_allocation=block_allocation,
329+
init_function=init_function,
330+
disable_dependencies=disable_dependencies,
331+
execute_function=execute_in_subprocess,
332+
)
333+
)
334+
else:
335+
super().__init__(
336+
executor=DependencyTaskScheduler(
337+
executor=create_single_node_executor(
338+
max_workers=max_workers,
339+
cache_directory=cache_directory,
340+
max_cores=max_cores,
341+
resource_dict=resource_dict,
342+
hostname_localhost=hostname_localhost,
343+
block_allocation=block_allocation,
344+
init_function=init_function,
345+
log_obj_size=log_obj_size,
346+
),
347+
max_cores=max_cores,
348+
refresh_rate=refresh_rate,
349+
plot_dependency_graph=plot_dependency_graph,
350+
plot_dependency_graph_filename=plot_dependency_graph_filename,
351+
)
352+
)
353+
354+
187355
def create_single_node_executor(
188356
max_workers: Optional[int] = None,
189357
max_cores: Optional[int] = None,

tests/test_testclusterexecutor.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import os
2+
import shutil
3+
import unittest
4+
5+
from executorlib import get_cache_data
6+
from executorlib.api import TestClusterExecutor
7+
from executorlib.standalone.plot import generate_nodes_and_edges
8+
from executorlib.standalone.serialize import cloudpickle_register
9+
10+
try:
11+
import h5py
12+
13+
skip_h5py_test = False
14+
except ImportError:
15+
skip_h5py_test = True
16+
17+
18+
def add_function(parameter_1, parameter_2):
19+
return parameter_1 + parameter_2
20+
21+
22+
def foo(x):
23+
return x + 1
24+
25+
26+
@unittest.skipIf(
27+
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
28+
)
29+
class TestTestClusterExecutor(unittest.TestCase):
30+
def test_cache_dir(self):
31+
with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe:
32+
cloudpickle_register(ind=1)
33+
future = exe.submit(
34+
foo,
35+
1,
36+
resource_dict={
37+
"cache_directory": "rather_this_dir",
38+
"cache_key": "foo",
39+
},
40+
)
41+
self.assertEqual(future.result(), 2)
42+
self.assertFalse(os.path.exists("not_this_dir"))
43+
cache_lst = get_cache_data(cache_directory="not_this_dir")
44+
self.assertEqual(len(cache_lst), 0)
45+
self.assertTrue(os.path.exists("rather_this_dir"))
46+
cache_lst = get_cache_data(cache_directory="rather_this_dir")
47+
self.assertEqual(len(cache_lst), 1)
48+
with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe:
49+
cloudpickle_register(ind=1)
50+
future = exe.submit(
51+
foo,
52+
1,
53+
resource_dict={
54+
"cache_directory": "rather_this_dir",
55+
"cache_key": "foo",
56+
},
57+
)
58+
self.assertEqual(future.result(), 2)
59+
self.assertFalse(os.path.exists("not_this_dir"))
60+
cache_lst = get_cache_data(cache_directory="not_this_dir")
61+
self.assertEqual(len(cache_lst), 0)
62+
self.assertTrue(os.path.exists("rather_this_dir"))
63+
cache_lst = get_cache_data(cache_directory="rather_this_dir")
64+
self.assertEqual(len(cache_lst), 1)
65+
66+
def test_empty(self):
67+
with TestClusterExecutor(cache_directory="rather_this_dir") as exe:
68+
cloudpickle_register(ind=1)
69+
future = exe.submit(foo,1)
70+
self.assertEqual(future.result(), 2)
71+
self.assertTrue(os.path.exists("rather_this_dir"))
72+
cache_lst = get_cache_data(cache_directory="rather_this_dir")
73+
self.assertEqual(len(cache_lst), 1)
74+
75+
def test_executor_dependency_plot(self):
76+
with TestClusterExecutor(
77+
plot_dependency_graph=True,
78+
) as exe:
79+
cloudpickle_register(ind=1)
80+
future_1 = exe.submit(add_function, 1, parameter_2=2)
81+
future_2 = exe.submit(add_function, 1, parameter_2=future_1)
82+
self.assertTrue(future_1.done())
83+
self.assertTrue(future_2.done())
84+
self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2)
85+
self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2)
86+
nodes, edges = generate_nodes_and_edges(
87+
task_hash_dict=exe._task_scheduler._task_hash_dict,
88+
future_hash_inverse_dict={
89+
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
90+
},
91+
)
92+
self.assertEqual(len(nodes), 5)
93+
self.assertEqual(len(edges), 4)
94+
95+
def tearDown(self):
96+
shutil.rmtree("rather_this_dir", ignore_errors=True)

0 commit comments

Comments
 (0)