diff --git a/executorlib/api.py b/executorlib/api.py index 3b236140..9e9b0941 100644 --- a/executorlib/api.py +++ b/executorlib/api.py @@ -5,6 +5,7 @@ functionality is considered internal and might change during minor releases. """ +from executorlib.executor.single import TestClusterExecutor from executorlib.standalone.command import get_command_path from executorlib.standalone.interactive.communication import ( SocketInterface, @@ -19,6 +20,7 @@ from executorlib.standalone.serialize import cloudpickle_register __all__: list[str] = [ + "TestClusterExecutor", "cancel_items_in_queue", "cloudpickle_register", "get_command_path", diff --git a/executorlib/executor/single.py b/executorlib/executor/single.py index 5293cad2..4e84b52b 100644 --- a/executorlib/executor/single.py +++ b/executorlib/executor/single.py @@ -184,6 +184,174 @@ def __init__( ) +class TestClusterExecutor(BaseExecutor): + """ + The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the + SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use the + SingleNodeExecutor. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the number of + cores which can be used in parallel - just like the max_cores parameter. Using max_cores is + recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same resource + requirements, executorlib supports block allocation. In this case all resources have + to be defined on the executor, rather than during the submission of the individual + function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + + Examples: + ``` + >>> import numpy as np + >>> from executorlib.api import TestClusterExecutor + >>> + >>> def calc(i, j, k): + >>> from mpi4py import MPI + >>> size = MPI.COMM_WORLD.Get_size() + >>> rank = MPI.COMM_WORLD.Get_rank() + >>> return np.array([i, j, k]), size, rank + >>> + >>> def init_k(): + >>> return {"k": 3} + >>> + >>> with TestClusterExecutor(max_workers=2, init_function=init_k) as p: + >>> fs = p.submit(calc, 2, j=4) + >>> print(fs.result()) + [(array([2, 4, 3]), 2, 0), (array([2, 4, 3]), 2, 1)] + ``` + """ + + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + plot_dependency_graph: bool = False, + plot_dependency_graph_filename: Optional[str] = None, + log_obj_size: bool = False, + ): + """ + The executorlib.api.TestClusterExecutor is designed to test the file based communication used in the + SlurmClusterExecutor and the FluxClusterExecutor locally. It is not recommended for production use, rather use + the SingleNodeExecutor. + + Args: + max_workers (int): for backwards compatibility with the standard library, max_workers also defines the + number of cores which can be used in parallel - just like the max_cores parameter. Using + max_cores is recommended, as computers have a limited number of compute cores. + cache_directory (str, optional): The directory to store cache files. Defaults to "executorlib_cache". + max_cores (int): defines the number cores which can be used in parallel + resource_dict (dict): A dictionary of resources required by the task. With the following keys: + - cores (int): number of MPI cores to be used for each function call + - threads_per_core (int): number of OpenMP threads to be used for each function call + - gpus_per_core (int): number of GPUs per worker - defaults to 0 + - cwd (str/None): current working directory where the parallel python task is executed + hostname_localhost (boolean): use localhost instead of the hostname to establish the zmq connection. In the + context of an HPC cluster this essential to be able to communicate to an + Executor running on a different compute node within the same allocation. And + in principle any computer should be able to resolve that their own hostname + points to the same address as localhost. Still MacOS >= 12 seems to disable + this look up for security reasons. So on MacOS it is required to set this + option to true + block_allocation (boolean): To accelerate the submission of a series of python functions with the same + resource requirements, executorlib supports block allocation. In this case all + resources have to be defined on the executor, rather than during the submission + of the individual function. + init_function (None): optional function to preset arguments for functions which are submitted later + disable_dependencies (boolean): Disable resolving future objects during the submission. + refresh_rate (float): Set the refresh rate in seconds, how frequently the input queue is checked. + plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For + debugging purposes and to get an overview of the specified dependencies. + plot_dependency_graph_filename (str): Name of the file to store the plotted graph in. + log_obj_size (bool): Enable debug mode which reports the size of the communicated objects. + + """ + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + } + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + if not plot_dependency_graph: + from executorlib.task_scheduler.file.subprocess_spawner import ( + execute_in_subprocess, + ) + from executorlib.task_scheduler.file.task_scheduler import ( + create_file_executor, + ) + + super().__init__( + executor=create_file_executor( + max_workers=max_workers, + backend=None, + max_cores=max_cores, + cache_directory=cache_directory, + resource_dict=resource_dict, + flux_executor=None, + flux_executor_pmi_mode=None, + flux_executor_nesting=False, + flux_log_files=False, + pysqa_config_directory=None, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + disable_dependencies=disable_dependencies, + execute_function=execute_in_subprocess, + ) + ) + else: + super().__init__( + executor=DependencyTaskScheduler( + executor=create_single_node_executor( + max_workers=max_workers, + cache_directory=cache_directory, + max_cores=max_cores, + resource_dict=resource_dict, + hostname_localhost=hostname_localhost, + block_allocation=block_allocation, + init_function=init_function, + log_obj_size=log_obj_size, + ), + max_cores=max_cores, + refresh_rate=refresh_rate, + plot_dependency_graph=plot_dependency_graph, + plot_dependency_graph_filename=plot_dependency_graph_filename, + ) + ) + + def create_single_node_executor( max_workers: Optional[int] = None, max_cores: Optional[int] = None, diff --git a/tests/test_testclusterexecutor.py b/tests/test_testclusterexecutor.py new file mode 100644 index 00000000..0a47aa5f --- /dev/null +++ b/tests/test_testclusterexecutor.py @@ -0,0 +1,96 @@ +import os +import shutil +import unittest + +from executorlib import get_cache_data +from executorlib.api import TestClusterExecutor +from executorlib.standalone.plot import generate_nodes_and_edges +from executorlib.standalone.serialize import cloudpickle_register + +try: + import h5py + + skip_h5py_test = False +except ImportError: + skip_h5py_test = True + + +def add_function(parameter_1, parameter_2): + return parameter_1 + parameter_2 + + +def foo(x): + return x + 1 + + +@unittest.skipIf( + skip_h5py_test, "h5py is not installed, so the h5io tests are skipped." +) +class TestTestClusterExecutor(unittest.TestCase): + def test_cache_dir(self): + with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe: + cloudpickle_register(ind=1) + future = exe.submit( + foo, + 1, + resource_dict={ + "cache_directory": "rather_this_dir", + "cache_key": "foo", + }, + ) + self.assertEqual(future.result(), 2) + self.assertFalse(os.path.exists("not_this_dir")) + cache_lst = get_cache_data(cache_directory="not_this_dir") + self.assertEqual(len(cache_lst), 0) + self.assertTrue(os.path.exists("rather_this_dir")) + cache_lst = get_cache_data(cache_directory="rather_this_dir") + self.assertEqual(len(cache_lst), 1) + with TestClusterExecutor(cache_directory="not_this_dir", resource_dict={}) as exe: + cloudpickle_register(ind=1) + future = exe.submit( + foo, + 1, + resource_dict={ + "cache_directory": "rather_this_dir", + "cache_key": "foo", + }, + ) + self.assertEqual(future.result(), 2) + self.assertFalse(os.path.exists("not_this_dir")) + cache_lst = get_cache_data(cache_directory="not_this_dir") + self.assertEqual(len(cache_lst), 0) + self.assertTrue(os.path.exists("rather_this_dir")) + cache_lst = get_cache_data(cache_directory="rather_this_dir") + self.assertEqual(len(cache_lst), 1) + + def test_empty(self): + with TestClusterExecutor(cache_directory="rather_this_dir") as exe: + cloudpickle_register(ind=1) + future = exe.submit(foo,1) + self.assertEqual(future.result(), 2) + self.assertTrue(os.path.exists("rather_this_dir")) + cache_lst = get_cache_data(cache_directory="rather_this_dir") + self.assertEqual(len(cache_lst), 1) + + def test_executor_dependency_plot(self): + with TestClusterExecutor( + plot_dependency_graph=True, + ) as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_function, 1, parameter_2=2) + future_2 = exe.submit(add_function, 1, parameter_2=future_1) + self.assertTrue(future_1.done()) + self.assertTrue(future_2.done()) + self.assertEqual(len(exe._task_scheduler._future_hash_dict), 2) + self.assertEqual(len(exe._task_scheduler._task_hash_dict), 2) + nodes, edges = generate_nodes_and_edges( + task_hash_dict=exe._task_scheduler._task_hash_dict, + future_hash_inverse_dict={ + v: k for k, v in exe._task_scheduler._future_hash_dict.items() + }, + ) + self.assertEqual(len(nodes), 5) + self.assertEqual(len(edges), 4) + + def tearDown(self): + shutil.rmtree("rather_this_dir", ignore_errors=True) \ No newline at end of file