diff --git a/.binder/environment.yml b/.binder/environment.yml index f94339f93..16a018613 100644 --- a/.binder/environment.yml +++ b/.binder/environment.yml @@ -4,7 +4,7 @@ dependencies: - bagofholding =0.1.2 - bidict =0.23.1 - cloudpickle =3.1.1 -- executorlib =1.5.2 +- executorlib =1.6.0 - graphviz =9.0.0 - pandas =2.3.1 - pint =0.24.4 diff --git a/.ci_support/environment-cluster.yml b/.ci_support/environment-cluster.yml new file mode 100644 index 000000000..b72979a76 --- /dev/null +++ b/.ci_support/environment-cluster.yml @@ -0,0 +1,5 @@ +channels: +- conda-forge +dependencies: +- pysqa =0.2.7 +- h5py =3.14.0 diff --git a/.ci_support/environment.yml b/.ci_support/environment.yml index f94339f93..16a018613 100644 --- a/.ci_support/environment.yml +++ b/.ci_support/environment.yml @@ -4,7 +4,7 @@ dependencies: - bagofholding =0.1.2 - bidict =0.23.1 - cloudpickle =3.1.1 -- executorlib =1.5.2 +- executorlib =1.6.0 - graphviz =9.0.0 - pandas =2.3.1 - pint =0.24.4 diff --git a/.ci_support/lower_bound.yml b/.ci_support/lower_bound.yml index e765e60a9..8807788cb 100644 --- a/.ci_support/lower_bound.yml +++ b/.ci_support/lower_bound.yml @@ -6,7 +6,7 @@ dependencies: - bagofholding =0.1.0 - bidict =0.23.1 - cloudpickle =3.0.0 -- executorlib =0.0.1 +- executorlib =1.5.3 - graphviz =9.0.0 - pandas =2.2.2 - pint =0.24 diff --git a/.github/workflows/push-pull.yml b/.github/workflows/push-pull.yml index ef7f58934..b95957f59 100644 --- a/.github/workflows/push-pull.yml +++ b/.github/workflows/push-pull.yml @@ -26,3 +26,13 @@ jobs: do-codacy: false do-coveralls: false do-mypy: true + + slurm-interruption: + uses: ./.github/workflows/slurm-test.yml + with: + mode: interrupt + + slurm-discovery: + uses: ./.github/workflows/slurm-test.yml + with: + mode: discover diff --git a/.github/workflows/slurm-test.yml b/.github/workflows/slurm-test.yml new file mode 100644 index 000000000..f43a36619 --- /dev/null +++ b/.github/workflows/slurm-test.yml @@ -0,0 +1,38 @@ +# Configure the CI with SLURM, and send a job to the queue via a workflow +# The submission job gets hard-killed, then we follow up by either restarting while the +# slurm job is running (mode = interrupt) or by waiting for it to finish +# (mode = discover) + +name: Slurm Test +on: + workflow_call: + inputs: + mode: + required: true + type: string + +jobs: + slurm_test: + runs-on: ubuntu-latest + services: + mysql: + image: mysql:8.0 + env: + MYSQL_ROOT_PASSWORD: root + ports: + - "8888:3306" + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + steps: + - uses: actions/checkout@v4 + - uses: koesterlab/setup-slurm-action@v1 + timeout-minutes: 5 + - uses: pyiron/actions/cached-miniforge@actions-4.0.8 + with: + python-version: '3.12' + env-files: .ci_support/environment.yml .ci_support/environment-cluster.yml + - name: Test (${{ inputs.mode }}) + shell: bash -l {0} + timeout-minutes: 8 + run: | + python -u tests/cluster/slurm_test.py --submit + python -u tests/cluster/slurm_test.py --${{ inputs.mode }} \ No newline at end of file diff --git a/docs/environment.yml b/docs/environment.yml index 2a6acfc50..fe8c3334e 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -10,7 +10,7 @@ dependencies: - bagofholding =0.1.2 - bidict =0.23.1 - cloudpickle =3.1.1 -- executorlib =1.5.2 +- executorlib =1.6.0 - graphviz =9.0.0 - pandas =2.3.1 - pint =0.24.4 diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py new file mode 100644 index 000000000..2ad69ccb2 --- /dev/null +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -0,0 +1,79 @@ +import inspect +from typing import Any, ClassVar + +from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor +from executorlib.api import TestClusterExecutor + +from pyiron_workflow.mixin import lexical, run + + +class DedicatedExecutorError(TypeError): + """ + To raise when you try to use one of these executors outside the context of a node. + """ + + +class ProtectedResourceError(ValueError): + """ + Raise when a user provides executorlib resources that we need to override. + """ + + +class CacheOverride(BaseExecutor): + override_cache_file_name: ClassVar[str] = "executorlib_cache" + + def submit(self, fn, /, *args, **kwargs): + """ + We demand that `fn` be the bound-method `on_run` of a `Lexical`+`Runnable` + class (a `Node` is, of course, the intended resolution of this demand). + """ + if ( + inspect.ismethod(fn) + and fn.__name__ == "on_run" + and isinstance(fn.__self__, lexical.Lexical) # provides .as_path + and isinstance(fn.__self__, run.Runnable) # provides .on_run + ): + cache_key_info = { + "cache_key": self.override_cache_file_name, + "cache_directory": str(fn.__self__.as_path()), + } + else: + raise DedicatedExecutorError( + f"{self.__class__.__name__} is only intended to work with the " + f"on_run method of pyiron_workflow.Node objects, but got {fn}" + ) + + _validate_existing_resource_dict(kwargs) + + if "resource_dict" in kwargs: + kwargs["resource_dict"].update(cache_key_info) + else: + kwargs["resource_dict"] = cache_key_info + + return super().submit(fn, *args, **kwargs) + + +def _validate_existing_resource_dict(kwargs: dict[str, Any]): + if "resource_dict" in kwargs: + if "cache_key" in kwargs["resource_dict"]: + raise ProtectedResourceError( + f"pyiron_workflow needs the freedom to specify the cache, so the " + f'requested "cache_directory" ' + f"({kwargs['resource_dict']['cache_key']}) would get overwritten." + ) + if "cache_directory" in kwargs["resource_dict"]: + raise ProtectedResourceError( + f"pyiron_workflow needs the freedom to specify the cache, so the " + f'requested "cache_directory" ' + f"({kwargs['resource_dict']['cache_directory']})would get " + f"overwritten." + ) + + +class CacheSingleNodeExecutor(CacheOverride, SingleNodeExecutor): ... + + +class CacheSlurmClusterExecutor(CacheOverride, SlurmClusterExecutor): ... + + +class _CacheTestClusterExecutor(CacheOverride, TestClusterExecutor): ... diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index 35c16896b..069443441 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -30,6 +30,9 @@ class ReadinessError(ValueError): readiness_dict: dict[str, bool] # Detailed information on why it is not ready +class NotInterpretableAsExecutorError(TypeError): ... + + class Runnable(UsesState, HasLabel, HasRun, ABC): """ An abstract class for interfacing with executors, etc. @@ -60,7 +63,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.running: bool = False self.failed: bool = False - self.executor: InterpretableAsExecutor | None = None + self._executor: InterpretableAsExecutor | None = None # We call it an executor, but it can also be instructions on making one self.future: None | Future = None self._thread_pool_sleep_time: float = 1e-6 @@ -78,6 +81,26 @@ def run_args(self) -> tuple[tuple, dict]: Any data needed for :meth:`on_run`, will be passed as (*args, **kwargs). """ + @property + def executor(self) -> InterpretableAsExecutor | None: + return self._executor + + @executor.setter + def executor(self, executor: InterpretableAsExecutor | None): + if not ( + isinstance(executor, StdLibExecutor | type(None)) + or ( + callable(executor[0]) + and isinstance(executor[1], tuple) + and isinstance(executor[2], dict) + ) + ): + raise NotInterpretableAsExecutorError( + f"Expected an instance of {StdLibExecutor}, or a tuple of such a " + f"class, a tuple of args, and a dict of kwargs -- but got {executor}." + ) + self._executor = executor + def process_run_result(self, run_output: Any) -> Any: """ What to _do_ with the results of :meth:`on_run` once you have them. @@ -160,13 +183,8 @@ def _none_to_dict(inp: dict | None) -> dict: if stop_early: return result - executor = ( - None if self.executor is None else self._parse_executor(self.executor) - ) - self.running = True return self._run( - executor=executor, raise_run_exceptions=raise_run_exceptions, run_exception_kwargs=run_exception_kwargs, run_finally_kwargs=run_finally_kwargs, @@ -202,7 +220,6 @@ def _before_run( def _run( self, /, - executor: StdLibExecutor | None, raise_run_exceptions: bool, run_exception_kwargs: dict, run_finally_kwargs: dict, @@ -231,7 +248,7 @@ def _run( f"first positional argument passed to :meth:`on_run`." ) - if executor is None: + if self.executor is None: try: run_output = self.on_run(*on_run_args, **on_run_kwargs) except (Exception, KeyboardInterrupt) as e: @@ -249,14 +266,28 @@ def _run( **finish_run_kwargs, ) else: - if isinstance(executor, ThreadPoolExecutor): - self.future = executor.submit( - self._thread_pool_run, *on_run_args, **on_run_kwargs - ) + if isinstance(self.executor, StdLibExecutor): + executor = self.executor + unique_executor = False else: - self.future = executor.submit( - self.on_run, *on_run_args, **on_run_kwargs - ) + creator, args, kwargs = self.executor + executor = creator(*args, **kwargs) + if not isinstance(executor, StdLibExecutor): + raise TypeError( + f"Expected an instance of {StdLibExecutor}, but got " + f"{type(executor)} from executor creation instructions " + f"{self.executor}." + ) + unique_executor = True + + submit_function = ( + self._thread_pool_run + if isinstance(executor, ThreadPoolExecutor) + else self.on_run + ) + self.future = executor.submit( + submit_function, *on_run_args, **on_run_kwargs + ) self.future.add_done_callback( partial( self._finish_run, @@ -266,8 +297,20 @@ def _run( **finish_run_kwargs, ) ) + + if unique_executor: + self.future.add_done_callback( + partial(self._shutdown_executor_callback, executor=executor) + ) + return self.future + @staticmethod + def _shutdown_executor_callback( + _future: Future, /, executor: StdLibExecutor + ) -> None: + executor.shutdown(wait=False) + def _run_exception(self, /, *args, **kwargs): """ What to do if an exception is encountered inside :meth:`_run` or @@ -322,44 +365,6 @@ def _readiness_error_message(self) -> str: f"should be neither running nor failed.\n" + self.readiness_report ) - @staticmethod - def _parse_executor( - executor: InterpretableAsExecutor, - ) -> StdLibExecutor: - """ - If you've already got an executor, you're done. But if you get callable and - some args and kwargs, turn them into an executor! - - This is because executors can't be serialized, but you might want to use an - executor on the far side of serialization. The most straightforward example is - to simply pass an executor class and its args and kwargs, but in a more - sophisticated case perhaps you want some function that accesses the _same_ - executor on multiple invocations such that multiple nodes are sharing the same - executor. The functionality here isn't intended to hold your hand for this, but - should be flexible enough that you _can_ do it if you want to. - """ - if isinstance(executor, StdLibExecutor): - return executor - elif ( - isinstance(executor, tuple) - and callable(executor[0]) - and isinstance(executor[1], tuple) - and isinstance(executor[2], dict) - ): - executor = executor[0](*executor[1], **executor[2]) - if not isinstance(executor, StdLibExecutor): - raise TypeError( - f"Executor parsing got a callable and expected it to return a " - f"`concurrent.futures.Executor` instance, but instead got " - f"{executor}." - ) - return executor - else: - raise NotImplementedError( - f"Expected an instance of {StdLibExecutor}, or a tuple of such a class, " - f"a tuple of args, and a dict of kwargs -- but got {executor}." - ) - def __getstate__(self): state = super().__getstate__() state["future"] = None @@ -367,8 +372,8 @@ def __getstate__(self): # the simple pyiron_workflow.executors.CloudpickleProcessPoolExecutor, but for # the more complex executorlib.Executor we're getting: # TypeError: cannot pickle '_thread.RLock' object - if isinstance(self.executor, StdLibExecutor): - state["executor"] = None + if isinstance(self._executor, StdLibExecutor): + state["_executor"] = None # Don't pass actual executors, they have an unserializable thread lock on them # _but_ if the user is just passing instructions on how to _build_ an executor, # we'll trust that those serialize OK (this way we can, hopefully, eventually diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 6de743c0e..413d3eb20 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -8,6 +8,8 @@ from __future__ import annotations import contextlib +import pathlib +import shutil from abc import ABC, abstractmethod from concurrent.futures import Future from importlib import import_module @@ -18,6 +20,7 @@ from pyiron_snippets.dotdict import DotDict from pyiron_workflow.draw import Node as GraphvizNode +from pyiron_workflow.executors.wrapped_executorlib import CacheOverride from pyiron_workflow.logging import logger from pyiron_workflow.mixin.lexical import Lexical from pyiron_workflow.mixin.run import ReadinessError, Runnable @@ -33,7 +36,6 @@ ) if TYPE_CHECKING: - from concurrent.futures import Executor from pathlib import Path import graphviz @@ -306,6 +308,9 @@ def __init__( # under-development status -- API may change to be more user-friendly self._do_clean: bool = False # Power-user override for cleaning up temporary # serialized results and empty directories (or not). + self._remove_executorlib_cache: bool = True # Power-user override for cleaning + # up temporary serialized results from runs with executorlib; intended to be + # used for testing self._cached_inputs: dict[str, Any] | None = None self._user_data: dict[str, Any] = {} @@ -389,6 +394,30 @@ def _readiness_error_message(self) -> str: f" conform to type hints.\n" + self.readiness_report ) + def _is_using_wrapped_excutorlib_executor(self) -> bool: + return self.executor is not None and ( + isinstance(self.executor, CacheOverride) + or ( + isinstance(self.executor, tuple) + and isinstance(self.executor[0], type) + and issubclass(self.executor[0], CacheOverride) + ) + ) + + def _clean_wrapped_executorlib_executor_cache(self) -> None: + self._wrapped_executorlib_cache_file.unlink() + cache_subdir = self.as_path() / CacheOverride.override_cache_file_name + if pathlib.Path(cache_subdir).is_dir(): + shutil.rmtree(cache_subdir) + self.clean_path() + + @property + def _wrapped_executorlib_cache_file(self) -> Path: + """For internal use to clean up cached executorlib files""" + # Depends on executorlib implementation details not protected by semver + file_name = CacheOverride.override_cache_file_name + "_o.h5" + return self.as_path() / file_name + def on_run(self, *args, **kwargs) -> Any: save_result: bool = args[0] args = args[1:] @@ -513,6 +542,8 @@ def _before_run( emit_ran_signal: bool, ) -> tuple[bool, Any]: if self.running: + if self._is_using_wrapped_excutorlib_executor(): + return False, None # Let it cook raise ReadinessError(self._readiness_error_message) if run_data_tree: @@ -554,7 +585,6 @@ def clear_cache(self): def _run( self, - executor: Executor | None, raise_run_exceptions: bool, run_exception_kwargs: dict, run_finally_kwargs: dict, @@ -563,7 +593,6 @@ def _run( if self.parent is not None and self.parent.running: self.parent.register_child_starting(self) return super()._run( - executor=executor, raise_run_exceptions=raise_run_exceptions, run_exception_kwargs=run_exception_kwargs, run_finally_kwargs=run_finally_kwargs, @@ -596,6 +625,12 @@ def _run_finally(self, /, emit_ran_signal: bool, raise_run_exceptions: bool): if self._do_clean: self._clean_graph_directory() + if ( + self._remove_executorlib_cache + and self._is_using_wrapped_excutorlib_executor() + ): + self._clean_wrapped_executorlib_executor_cache() + def run_data_tree(self, run_parent_trees_too=False) -> None: """ Use topological analysis to build a tree of all upstream dependencies and run diff --git a/pyiron_workflow/nodes/composite.py b/pyiron_workflow/nodes/composite.py index 1463c570f..0a69c2793 100644 --- a/pyiron_workflow/nodes/composite.py +++ b/pyiron_workflow/nodes/composite.py @@ -199,6 +199,8 @@ def _on_cache_miss(self) -> None: def _on_run(self): if len(self.running_children) > 0: # Start from a broken process for label in self.running_children: + if self.children[label]._is_using_wrapped_excutorlib_executor(): + self.running_children.remove(label) self.children[label].run() # Running children will find serialized result and proceed, # or raise an error because they're already running @@ -295,7 +297,7 @@ def _parse_remotely_executed_self(self, other_self): def _get_state_from_remote_other(self, other_self): state = other_self.__getstate__() - state.pop("executor") # Got overridden to None for __getstate__, so keep local + state.pop("_executor") # Got overridden to None for __getstate__, so keep local state.pop("_parent") # Got overridden to None for __getstate__, so keep local return state diff --git a/pyiron_workflow/workflow.py b/pyiron_workflow/workflow.py index 0f1c24000..0208f75a4 100644 --- a/pyiron_workflow/workflow.py +++ b/pyiron_workflow/workflow.py @@ -393,7 +393,7 @@ def run( def run_in_thread( self, *args, check_readiness: bool = True, **kwargs - ) -> futures.Future: + ) -> futures.Future | dict[str, Any]: if self.executor is not None: raise ValueError( f"Workflow {self.label} already has an executor set. Running in a " @@ -401,7 +401,11 @@ def run_in_thread( ) self.executor = (futures.ThreadPoolExecutor, (), {}) f = self.run(*args, check_readiness=check_readiness, **kwargs) - f.add_done_callback(lambda _: setattr(self, "executor", None)) + if isinstance(f, futures.Future): + f.add_done_callback(lambda _: setattr(self, "executor", None)) + else: + # We're hitting a cached result + self.executor = None return f def pull(self, run_parent_trees_too=False, **kwargs): diff --git a/pyproject.toml b/pyproject.toml index 649da18a5..02d89a165 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ "bagofholding==0.1.2", "bidict==0.23.1", "cloudpickle==3.1.1", - "executorlib==1.5.2", + "executorlib==1.6.0", "graphviz==0.21", "pandas==2.3.1", "pint==0.24.4", @@ -56,6 +56,12 @@ Homepage = "https://github.com/pyiron/pyiron_workflow" Documentation = "https://pyiron-workflow.readthedocs.io" Repository = "https://github.com/pyiron/pyiron_workflow" +[project.optional-dependencies] +cluster = [ + "pysqa==0.2.7", + "h5py==3.14.0", +] + [tool.versioneer] VCS = "git" style = "pep440-pre" diff --git a/tests/cluster/__init__.py b/tests/cluster/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/cluster/slurm_test.py b/tests/cluster/slurm_test.py new file mode 100644 index 000000000..b4cb77ad9 --- /dev/null +++ b/tests/cluster/slurm_test.py @@ -0,0 +1,123 @@ +import argparse +import os +import time + +import pyiron_workflow as pwf +from pyiron_workflow.executors.wrapped_executorlib import CacheSlurmClusterExecutor + +t_overhead = 2 +t_sleep = 10 + + +def state_check(wf, expected_wf, expected_n2, expected_result=pwf.api.NOT_DATA): + wf_running, n_running, outputs = ( + wf.running, + wf.n2.running, + wf.outputs.to_value_dict(), + ) + print( + "wf.running, wf.n2.running, wf.outputs.to_value_dict()", + wf_running, + n_running, + outputs, + ) + assert (wf_running, n_running, outputs["n3__user_input"]) == ( + expected_wf, + expected_n2, + expected_result, + ) + + +def submission(): + submission_template = """\ +#!/bin/bash +#SBATCH --output=time.out +#SBATCH --job-name={{job_name}} +#SBATCH --chdir={{working_directory}} +#SBATCH --get-user-env=L +#SBATCH --cpus-per-task={{cores}} + +{{command}} +""" + resource_dict = {"submission_template": submission_template} + + wf = pwf.Workflow("slurm_test") + wf.n1 = pwf.std.UserInput(t_sleep) + wf.n2 = pwf.std.Sleep(wf.n1) + wf.n3 = pwf.std.UserInput(wf.n2) + + print("submitting") + print(time.time()) + wf.n2.executor = (CacheSlurmClusterExecutor, (), {"resource_dict": resource_dict}) + wf.n2.use_cache = False + out = wf.run_in_thread() + print("run return", out) + state_check(wf, True, True) + print("sleeping", t_overhead + t_sleep / 4) + time.sleep(t_overhead + t_sleep / 4) + print("saving") + state_check(wf, True, True) + wf.save() + print("sleeping", t_sleep / 4) + time.sleep(t_sleep / 4) + print("pre-exit state") + state_check(wf, True, True) + print("hard exit at time", time.time()) + os._exit(0) # Hard exit so that we don't wait for the executor + + +def interruption(): + print("loading at time", time.time()) + wf = pwf.Workflow("slurm_test") + state_check(wf, True, True) + wf.executor = None # https://github.com/pyiron/pyiron_workflow/issues/705 + wf.running = False # https://github.com/pyiron/pyiron_workflow/issues/706 + print("re-running") + out = wf.run_in_thread() + print("run return", out) + state_check(wf, True, True) + print("sleeping", t_overhead + t_sleep) + time.sleep(t_overhead + t_sleep) + state_check(wf, False, False, t_sleep) + wf.delete_storage() + + +def discovery(): + print("loading at time", time.time()) + wf = pwf.Workflow("slurm_test") + state_check(wf, True, True) + wf.executor = None # https://github.com/pyiron/pyiron_workflow/issues/705 + wf.running = False # https://github.com/pyiron/pyiron_workflow/issues/706 + print("sleeping", t_overhead + t_sleep) + time.sleep(t_overhead + t_sleep) + print("re-running") + out = wf.run_in_thread() + print("run return", out) + state_check(wf, True, True) + print("sleeping", t_sleep / 10) + time.sleep(t_sleep / 10) + print("finally") + state_check(wf, False, False, t_sleep) + wf.delete_storage() + + +def main(): + parser = argparse.ArgumentParser(description="Run workflow stages.") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--submit", action="store_true", help="Run submission stage.") + group.add_argument( + "--interrupt", action="store_true", help="Run interruption stage." + ) + group.add_argument("--discover", action="store_true", help="Run discovery stage.") + args = parser.parse_args() + + if args.submit: + submission() + elif args.interrupt: + interruption() + elif args.discover: + discovery() + + +if __name__ == "__main__": + main() diff --git a/tests/integration/test_parallel_speedup.py b/tests/integration/test_parallel_speedup.py index 67e229597..489dcba34 100644 --- a/tests/integration/test_parallel_speedup.py +++ b/tests/integration/test_parallel_speedup.py @@ -97,6 +97,8 @@ def test_executor_instructions(self): "constructors should survive (de)serialization", ) + reloaded.delete_storage() + if __name__ == "__main__": unittest.main() diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 51e0a727e..6c115e32f 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -161,12 +161,8 @@ def test_executors(self): Workflow.create.ProcessPoolExecutor, Workflow.create.ThreadPoolExecutor, Workflow.create.CloudpickleProcessPoolExecutor, + Workflow.create.executorlib.SingleNodeExecutor, ] - try: - executors.append(Workflow.create.executorlib.SingleNodeExecutor) - except AttributeError: - # executorlib < 0.1 had an Executor with optional backend parameter (defaulting to SingleNodeExecutor) - executors.append(Workflow.create.executorlib.Executor) wf = Workflow("executed") wf.a = Workflow.create.std.UserInput(42) # Regular diff --git a/tests/integration/test_wrapped_executorlib.py b/tests/integration/test_wrapped_executorlib.py new file mode 100644 index 000000000..e93253457 --- /dev/null +++ b/tests/integration/test_wrapped_executorlib.py @@ -0,0 +1,92 @@ +import time +import unittest + +import pyiron_workflow as pwf +from pyiron_workflow.executors.wrapped_executorlib import ( + CacheSingleNodeExecutor, + _CacheTestClusterExecutor, +) + + +class TestWrappedExecutorlib(unittest.TestCase): + + def _test_cache(self, executor_class): + t_sleep = 1 + + wf = pwf.Workflow("passive_run") + wf.n1 = pwf.std.UserInput(t_sleep) + wf.n2 = pwf.std.Sleep(wf.n1) + wf.n3 = pwf.std.UserInput(wf.n2) + expected_output = {"n3__user_input": t_sleep} + + wf.use_cache = False + wf.n2.use_cache = False + wf.n2._remove_executorlib_cache = False + + wf.n2.executor = (executor_class, (), {}) + + t0 = time.perf_counter() + out1 = wf.run() + t1 = time.perf_counter() + out2 = wf.run() + t2 = time.perf_counter() + + self.assertDictEqual( + expected_output, + out1, + msg="Sanity check that the workflow ran ok", + ) + self.assertDictEqual( + expected_output, + out2, + msg="Sanity check that the workflow re-ran ok", + ) + self.assertFalse( + wf.n2.cache_hit, msg="Sanity check that we're not just using the cache" + ) + + t_first_run = t1 - t0 + t_second_run = t2 - t1 + self.assertGreater( + t_first_run, + t_sleep, + msg="The initial run should be executing the sleep node", + ) + self.assertLess( + t_second_run, + 0.5 * t_sleep, + msg="The second run should allow executorlib to find the cached result, " + "and be much faster than the sleep time", + ) + + self.assertTrue( + wf.n2._wrapped_executorlib_cache_file.is_file(), + msg="Since we deactivated cache removal, we expect the executorlib cache " + "file to still be there", + ) + + wf.n2.running = True # Fake that it's still running + wf.n2._remove_executorlib_cache = True # Reactivate automatic cleanup + out3 = wf.run() + + self.assertDictEqual( + expected_output, + out3, + msg="Workflow should recover from a running child state when the wrapped " + "executorlib executor can find a cached result", + ) + self.assertFalse( + wf.n2._wrapped_executorlib_cache_file.is_file(), + msg="The cached result should be cleaned up", + ) + + def test_cache(self): + for executor_class in [CacheSingleNodeExecutor, _CacheTestClusterExecutor]: + with self.subTest(executor_class.__name__): + self._test_cache(executor_class) + + def test_automatic_cleaning(self): + n = pwf.std.UserInput(1) + n.executor = (_CacheTestClusterExecutor, (), {}) + n.run() + self.assertFalse(n._wrapped_executorlib_cache_file.is_file()) diff --git a/tests/unit/mixin/test_run.py b/tests/unit/mixin/test_run.py index bfa32ece4..2e687f6f8 100644 --- a/tests/unit/mixin/test_run.py +++ b/tests/unit/mixin/test_run.py @@ -4,7 +4,11 @@ from pyiron_workflow.executors.cloudpickleprocesspool import ( CloudpickleProcessPoolExecutor, ) -from pyiron_workflow.mixin.run import ReadinessError, Runnable +from pyiron_workflow.mixin.run import ( + NotInterpretableAsExecutorError, + ReadinessError, + Runnable, +) class ConcreteRunnable(Runnable): @@ -154,9 +158,7 @@ def maybe_get_executor(get_executor): msg="Expected the result, including post-processing 'bar' value", ) - with self.assertRaises( - NotImplementedError, msg="That's not an executor at all" - ): + with self.assertRaises(TypeError, msg="That's not an executor at all"): runnable.executor = 42 runnable.run() @@ -168,6 +170,11 @@ def maybe_get_executor(get_executor): runnable.executor = (maybe_get_executor, (False,), {}) runnable.run() + def test_executor_interpretation(self): + runnable = ConcreteRunnable() + with self.assertRaises(NotInterpretableAsExecutorError): + runnable.executor = "This is not an executor!" + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_workflow.py b/tests/unit/test_workflow.py index 8d899c4d6..7437aa912 100644 --- a/tests/unit/test_workflow.py +++ b/tests/unit/test_workflow.py @@ -501,6 +501,34 @@ def test_pickle(self): wf_out, reloaded.outputs.to_value_dict(), msg="Pickling should work" ) + def test_repeated_thread_runs(self): + wf = Workflow("wf") + wf.n = demo_nodes.AddThree(x=0) + f = wf.run_in_thread() + self.assertTrue(wf.running, msg="Sanity check") + + self.assertEqual(f.result(), wf, msg="Background run should complete") + self.assertEqual(wf.n.outputs.add_three.value, 3, msg="Sanity check") + max_waits = 10 + while wf.executor is not None: + sleep(0.1) + max_waits -= 1 + if max_waits == 0: + raise RuntimeError( + "Executor should be gone by now -- we're just trying to buy a " + "smidgen of time for the callback to finish." + ) + self.assertIsNone( + wf.executor, msg="On-the-fly thread executors should be transient" + ) + cached = wf.run_in_thread() + self.assertDictEqual( + cached, + wf.outputs.to_value_dict(), + msg="On cache hits we don't expect a future back", + ) + self.assertIsNone(wf.executor, msg="No new executor should be set") + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_wrapped_executorlib.py b/tests/unit/test_wrapped_executorlib.py new file mode 100644 index 000000000..2afe905b8 --- /dev/null +++ b/tests/unit/test_wrapped_executorlib.py @@ -0,0 +1,39 @@ +import unittest + +from pyiron_workflow.executors.wrapped_executorlib import ( + CacheSingleNodeExecutor, + DedicatedExecutorError, + ProtectedResourceError, +) +from pyiron_workflow.nodes import standard as std + + +def foo(x): + return x + 1 + + +class TestWrappedExecutorlib(unittest.TestCase): + def test_application_protection(self): + with ( + CacheSingleNodeExecutor() as exe, + self.assertRaises( + DedicatedExecutorError, + msg="These executors are specialized to work with node runs", + ), + ): + exe.submit(foo, 1) + + def test_resource_protection(self): + protected_resources = ( + {"cache_key": "my_key"}, + {"cache_directory": "my_directory"}, + {"cache_key": "my_key", "cache_directory": "my_directory"}, + ) + for resource_dict in protected_resources: + with ( + self.subTest(msg=f"Submit resource dict: {resource_dict}"), + CacheSingleNodeExecutor() as exe, + self.assertRaises(ProtectedResourceError), + ): + n = std.UserInput() + exe.submit(n.on_run, 42, resource_dict=resource_dict)