Skip to content

Add TestClusterExecutor to simplify debugging of SlurmClusterExecutor and FluxClusterExecutor #714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 12, 2025
2 changes: 2 additions & 0 deletions executorlib/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
functionality is considered internal and might change during minor releases.
"""

from executorlib.executor.single import TestClusterExecutor
from executorlib.standalone.command import get_command_path
from executorlib.standalone.interactive.communication import (
SocketInterface,
Expand All @@ -19,6 +20,7 @@
from executorlib.standalone.serialize import cloudpickle_register

__all__: list[str] = [
"TestClusterExecutor",
"cancel_items_in_queue",
"cloudpickle_register",
"get_command_path",
Expand Down
168 changes: 168 additions & 0 deletions executorlib/executor/single.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,174 @@ def __init__(
)


class TestClusterExecutor(BaseExecutor):
"""
The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the
SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use the
SingleNodeExecutor.

Args:
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of
cores which can be used in parallel - just like the max_cores parameter. Using max_cores is
recommended, as computers have a limited number of compute cores.
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource
requirements, executorlib supports block allocation. In this case all resources have
to be defined on the executor, rather than during the submission of the individual
function.
init_function (None): optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

Examples:
```
>>> import numpy as np
>>> from executorlib.api import TestClusterExecutor
>>>
>>> def calc(i, j, k):
>>> from mpi4py import MPI
>>> size = MPI.COMM_WORLD.Get_size()
>>> rank = MPI.COMM_WORLD.Get_rank()
>>> return np.array([i, j, k]), size, rank
>>>
>>> def init_k():
>>> return {"k": 3}
>>>
>>> with TestClusterExecutor(max_workers=2, init_function=init_k) as p:
>>> fs = p.submit(calc, 2, j=4)
>>> print(fs.result())
[(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)]
```
"""

def __init__(
self,
max_workers: Optional[int] = None,
cache_directory: Optional[str] = None,
max_cores: Optional[int] = None,
resource_dict: Optional[dict] = None,
hostname_localhost: Optional[bool] = None,
block_allocation: bool = False,
init_function: Optional[Callable] = None,
disable_dependencies: bool = False,
refresh_rate: float = 0.01,
plot_dependency_graph: bool = False,
plot_dependency_graph_filename: Optional[str] = None,
log_obj_size: bool = False,
):
"""
The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the
SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use
the SingleNodeExecutor.

Args:
max_workers (int): for backwards compatibility with the standard library, max_workers also defines the
number of cores which can be used in parallel - just like the max_cores parameter. Using
max_cores is recommended, as computers have a limited number of compute cores.
cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache".
max_cores (int): defines the number cores which can be used in parallel
resource_dict (dict): A dictionary of resources required by the task. With the following keys:
- cores (int): number of MPI cores to be used for each function call
- threads_per_core (int): number of OpenMP threads to be used for each function call
- gpus_per_core (int): number of GPUs per worker - defaults to 0
- cwd (str/None): current working directory where the parallel python task is executed
hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the
context of an HPC cluster this essential to be able to communicate to an
Executor running on a different compute node within the same allocation. And
in principle any computer should be able to resolve that their own hostname
points to the same address as localhost. Still MacOS >= 12 seems to disable
this look up for security reasons. So on MacOS it is required to set this
option to true
block_allocation (boolean): To accelerate the submission of a series of python functions with the same
resource requirements, executorlib supports block allocation. In this case all
resources have to be defined on the executor, rather than during the submission
of the individual function.
init_function (None): optional function to preset arguments for functions which are submitted later
disable_dependencies (boolean): Disable resolving future objects during the submission.
refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked.
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
debugging purposes and to get an overview of the specified dependencies.
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.

"""
default_resource_dict: dict = {
"cores": 1,
"threads_per_core": 1,
"gpus_per_core": 0,
"cwd": None,
"openmpi_oversubscribe": False,
}
if resource_dict is None:
resource_dict = {}
resource_dict.update(
{k: v for k, v in default_resource_dict.items() if k not in resource_dict}
)
if not plot_dependency_graph:
from executorlib.task_scheduler.file.subprocess_spawner import (
execute_in_subprocess,
)
from executorlib.task_scheduler.file.task_scheduler import (
create_file_executor,
)

super().__init__(
executor=create_file_executor(
max_workers=max_workers,
backend=None,
max_cores=max_cores,
cache_directory=cache_directory,
resource_dict=resource_dict,
flux_executor=None,
flux_executor_pmi_mode=None,
flux_executor_nesting=False,
flux_log_files=False,
pysqa_config_directory=None,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
disable_dependencies=disable_dependencies,
execute_function=execute_in_subprocess,
)
)
else:
super().__init__(
executor=DependencyTaskScheduler(
executor=create_single_node_executor(
max_workers=max_workers,
cache_directory=cache_directory,
max_cores=max_cores,
resource_dict=resource_dict,
hostname_localhost=hostname_localhost,
block_allocation=block_allocation,
init_function=init_function,
log_obj_size=log_obj_size,
),
max_cores=max_cores,
refresh_rate=refresh_rate,
plot_dependency_graph=plot_dependency_graph,
plot_dependency_graph_filename=plot_dependency_graph_filename,
)
)


def create_single_node_executor(
max_workers: Optional[int] = None,
max_cores: Optional[int] = None,
Expand Down
96 changes: 96 additions & 0 deletions tests/test_testclusterexecutor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os
import shutil
import unittest

from executorlib import get_cache_data
from executorlib.api import TestClusterExecutor
from executorlib.standalone.plot import generate_nodes_and_edges
from executorlib.standalone.serialize import cloudpickle_register

try:
import h5py

skip_h5py_test = False
except ImportError:
skip_h5py_test = True


def add_function(parameter_1, parameter_2):
return parameter_1 + parameter_2


def foo(x):
return x + 1


@unittest.skipIf(
skip_h5py_test, "h5py is not installed, so the h5io tests are skipped."
)
class TestTestClusterExecutor(unittest.TestCase):
def test_cache_dir(self):
with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe:
cloudpickle_register(ind=1)
future = exe.submit(
foo,
1,
resource_dict={
"cache_directory": "rather_this_dir",
"cache_key": "foo",
},
)
self.assertEqual(future.result(), 2)
self.assertFalse(os.path.exists("not_this_dir"))
cache_lst = get_cache_data(cache_directory="not_this_dir")
self.assertEqual(len(cache_lst), 0)
self.assertTrue(os.path.exists("rather_this_dir"))
cache_lst = get_cache_data(cache_directory="rather_this_dir")
self.assertEqual(len(cache_lst), 1)
with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe:
cloudpickle_register(ind=1)
future = exe.submit(
foo,
1,
resource_dict={
"cache_directory": "rather_this_dir",
"cache_key": "foo",
},
)
self.assertEqual(future.result(), 2)
self.assertFalse(os.path.exists("not_this_dir"))
cache_lst = get_cache_data(cache_directory="not_this_dir")
self.assertEqual(len(cache_lst), 0)
self.assertTrue(os.path.exists("rather_this_dir"))
cache_lst = get_cache_data(cache_directory="rather_this_dir")
self.assertEqual(len(cache_lst), 1)

def test_empty(self):
with TestClusterExecutor(cache_directory="rather_this_dir") as exe:
cloudpickle_register(ind=1)
future = exe.submit(foo,1)
self.assertEqual(future.result(), 2)
self.assertTrue(os.path.exists("rather_this_dir"))
cache_lst = get_cache_data(cache_directory="rather_this_dir")
self.assertEqual(len(cache_lst), 1)

def test_executor_dependency_plot(self):
with TestClusterExecutor(
plot_dependency_graph=True,
) as exe:
cloudpickle_register(ind=1)
future_1 = exe.submit(add_function, 1, parameter_2=2)
future_2 = exe.submit(add_function, 1, parameter_2=future_1)
self.assertTrue(future_1.done())
self.assertTrue(future_2.done())
self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2)
self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2)
nodes, edges = generate_nodes_and_edges(
task_hash_dict=exe._task_scheduler._task_hash_dict,
future_hash_inverse_dict={
v: k for k, v in exe._task_scheduler._future_hash_dict.items()
},
)
self.assertEqual(len(nodes), 5)
self.assertEqual(len(edges), 4)

def tearDown(self):
shutil.rmtree("rather_this_dir", ignore_errors=True)
Loading