From 246626d2f892dbfdc7581b0d74b4e1e999379e8f Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 19 Jun 2025 13:38:55 -0700 Subject: [PATCH 01/32] Wrap executorlib executors To exploit the new caching interface so that nodes can rely on their running state and lexical path to access previously executed results. Locally with the SingleNodeExecutor everything is looking good, but that doesn't natively support terminating the process that submit the job. I'd like to play around with this using the SlurmClusterExecutor on the cluster before making further changes. Signed-off-by: liamhuber --- .../executors/wrapped_executorlib.py | 51 +++++++++++++++++++ pyiron_workflow/node.py | 37 ++++++++++++++ pyiron_workflow/nodes/composite.py | 2 + 3 files changed, 90 insertions(+) create mode 100644 pyiron_workflow/executors/wrapped_executorlib.py diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py new file mode 100644 index 000000000..061f44fe9 --- /dev/null +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -0,0 +1,51 @@ +import inspect + +from executorlib import BaseExecutor, SingleNodeExecutor + +from pyiron_workflow.mixin import lexical, run + + +class CacheOverride(BaseExecutor): + def submit(self, fn, /, *args, **kwargs): + """ + We demand that `fn` be the bound-method `on_run` of a `Lexical`+`Runnable` + class (a `Node` is, of course, the intended resolution of this demand). + """ + if ( + inspect.ismethod(fn) + and fn.__name__ == "on_run" + and isinstance(fn.__self__, lexical.Lexical) # provides .as_path + and isinstance(fn.__self__, run.Runnable) # provides .on_run + ): + cache_key_info = { + "cache_key": str(fn.__self__.as_path()), + "cache_directory": ".", # Doesn't matter, the path in the key overrides + } + else: + raise TypeError( + f"{self.__name__} is only intended to work with the " + f"on_run method of pyiron_workflow.Node objects, but got {fn}" + ) + + if "resource_dict" in kwargs: + if "cache_key" in kwargs["resource_dict"]: + raise ValueError( + f"pyiron_workflow needs the freedom to specify the cache, so the " + f'requested "cache_directory" ' + f"({kwargs["resource_dict"]["cache_key"]}) would get overwritten." + ) + if "cache_directory" in kwargs["resource_dict"]: + raise ValueError( + f"pyiron_workflow needs the freedom to specify the cache, so the " + f'requested "cache_directory" ' + f"({kwargs["resource_dict"]["cache_directory"]})would get " + f"overwritten." + ) + kwargs["resource_dict"].update(cache_key_info) + else: + kwargs["resource_dict"] = cache_key_info + + return super().submit(fn, *args, **kwargs) + + +class CacheSingleNodeExecutor(SingleNodeExecutor, CacheOverride): ... diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index a1d5d60d9..0c788592f 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -18,6 +18,7 @@ from pyiron_snippets.dotdict import DotDict from pyiron_workflow.draw import Node as GraphvizNode +from pyiron_workflow.executors.wrapped_executorlib import CacheOverride from pyiron_workflow.logging import logger from pyiron_workflow.mixin.lexical import Lexical from pyiron_workflow.mixin.run import ReadinessError, Runnable @@ -391,6 +392,18 @@ def _readiness_error_message(self) -> str: f" conform to type hints.\n" + self.readiness_report ) + def _is_using_wrapped_excutorlib_executor(self) -> bool: + return self.executor is not None and isinstance( + self._parse_executor(self.executor), CacheOverride + ) + + def _clean_wrapped_executorlib_executor_cache(self) -> None: + location = self.as_path().parent + file_name = self.label + "_o.h5" # Dependent on executorlib implementation + location.joinpath(file_name).unlink() + with contextlib.suppress(OSError): # If it's not empty just move on + location.rmdir() + def on_run(self, *args, **kwargs) -> Any: save_result: bool = args[0] args = args[1:] @@ -515,6 +528,8 @@ def _before_run( emit_ran_signal: bool, ) -> tuple[bool, Any]: if self.running: + if self._is_using_wrapped_excutorlib_executor(): + return False, None # Let it cook raise ReadinessError(self._readiness_error_message) if run_data_tree: @@ -562,7 +577,9 @@ def _run( run_finally_kwargs: dict, finish_run_kwargs: dict, ) -> Any | tuple | Future: + print("Running", self.full_label) if self.parent is not None and self.parent.running: + print("Registering start with parent", self.full_label) self.parent.register_child_starting(self) return super()._run( executor=executor, @@ -572,6 +589,26 @@ def _run( finish_run_kwargs=finish_run_kwargs, ) + def _finish_run( + self, + run_output: tuple | Future, + /, + raise_run_exceptions: bool, + run_exception_kwargs: dict, + run_finally_kwargs: dict, + **kwargs, + ) -> Any | tuple | None: + if self._is_using_wrapped_excutorlib_executor(): + self._clean_wrapped_executorlib_executor_cache() + + return super()._finish_run( + run_output, + raise_run_exceptions=raise_run_exceptions, + run_exception_kwargs=run_exception_kwargs, + run_finally_kwargs=run_finally_kwargs, + **kwargs, + ) + def _run_finally(self, /, emit_ran_signal: bool, raise_run_exceptions: bool): super()._run_finally() if self.parent is not None and self.parent.running: diff --git a/pyiron_workflow/nodes/composite.py b/pyiron_workflow/nodes/composite.py index 9a5d24ac4..86081443e 100644 --- a/pyiron_workflow/nodes/composite.py +++ b/pyiron_workflow/nodes/composite.py @@ -178,6 +178,8 @@ def _on_cache_miss(self) -> None: def _on_run(self): if len(self.running_children) > 0: # Start from a broken process for label in self.running_children: + if self.children[label]._is_using_wrapped_excutorlib_executor(): + self.running_children.remove(label) self.children[label].run() # Running children will find serialized result and proceed, # or raise an error because they're already running From 1acffcf68d989849eabf6475dcb7fd19ac7a4693 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 19 Jun 2025 13:43:26 -0700 Subject: [PATCH 02/32] Be kinder to fstrings Signed-off-by: liamhuber --- pyiron_workflow/executors/wrapped_executorlib.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 061f44fe9..af211b604 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -31,14 +31,14 @@ class (a `Node` is, of course, the intended resolution of this demand). if "cache_key" in kwargs["resource_dict"]: raise ValueError( f"pyiron_workflow needs the freedom to specify the cache, so the " - f'requested "cache_directory" ' - f"({kwargs["resource_dict"]["cache_key"]}) would get overwritten." + f"requested \"cache_directory\" " + f"({kwargs['resource_dict']['cache_key']}) would get overwritten." ) if "cache_directory" in kwargs["resource_dict"]: raise ValueError( f"pyiron_workflow needs the freedom to specify the cache, so the " - f'requested "cache_directory" ' - f"({kwargs["resource_dict"]["cache_directory"]})would get " + f"requested \"cache_directory\" " + f"({kwargs['resource_dict']['cache_directory']})would get " f"overwritten." ) kwargs["resource_dict"].update(cache_key_info) From afa60b208418a1dc5a030b4a8364ee13223830e2 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 19 Jun 2025 13:49:56 -0700 Subject: [PATCH 03/32] Remove prints Signed-off-by: liamhuber --- pyiron_workflow/node.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 0c788592f..75eb7a7f1 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -577,9 +577,7 @@ def _run( run_finally_kwargs: dict, finish_run_kwargs: dict, ) -> Any | tuple | Future: - print("Running", self.full_label) if self.parent is not None and self.parent.running: - print("Registering start with parent", self.full_label) self.parent.register_child_starting(self) return super()._run( executor=executor, From b5191adea90754efc1ee5827ff000090f0ef5fd8 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 19 Jun 2025 13:52:09 -0700 Subject: [PATCH 04/32] Black Signed-off-by: liamhuber --- pyiron_workflow/executors/wrapped_executorlib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index af211b604..012e945f6 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -31,13 +31,13 @@ class (a `Node` is, of course, the intended resolution of this demand). if "cache_key" in kwargs["resource_dict"]: raise ValueError( f"pyiron_workflow needs the freedom to specify the cache, so the " - f"requested \"cache_directory\" " + f'requested "cache_directory" ' f"({kwargs['resource_dict']['cache_key']}) would get overwritten." ) if "cache_directory" in kwargs["resource_dict"]: raise ValueError( f"pyiron_workflow needs the freedom to specify the cache, so the " - f"requested \"cache_directory\" " + f'requested "cache_directory" ' f"({kwargs['resource_dict']['cache_directory']})would get " f"overwritten." ) From 4f0843426c28a6182689eaff80d59216e051c846 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 19 Jun 2025 13:52:50 -0700 Subject: [PATCH 05/32] Bump lower bound of executorlib Since we now depend explicitly on a new feature Signed-off-by: liamhuber --- .ci_support/lower_bound.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci_support/lower_bound.yml b/.ci_support/lower_bound.yml index 1a362590b..b80805139 100644 --- a/.ci_support/lower_bound.yml +++ b/.ci_support/lower_bound.yml @@ -5,7 +5,7 @@ dependencies: - coverage - bidict =0.23.0 - cloudpickle =3.0.0 -- executorlib =0.0.1 +- executorlib =1.5.0 - graphviz =9.0.0 - pandas =2.2.0 - pint =0.23.0 From e90d55f8d724c6ebb0da0f36aa938a1aa4519488 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 19 Jun 2025 14:13:25 -0700 Subject: [PATCH 06/32] Wrap SlurmClusterExecutor Signed-off-by: liamhuber --- pyiron_workflow/executors/wrapped_executorlib.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 012e945f6..116d17714 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -1,6 +1,6 @@ import inspect -from executorlib import BaseExecutor, SingleNodeExecutor +from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor from pyiron_workflow.mixin import lexical, run @@ -49,3 +49,6 @@ class (a `Node` is, of course, the intended resolution of this demand). class CacheSingleNodeExecutor(SingleNodeExecutor, CacheOverride): ... + + +class CacheSlurmClusterExecutor(SlurmClusterExecutor, CacheOverride): ... From 15cf97ac734f7189c8ed55409a0855f10ebbd055 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 10 Jul 2025 11:13:14 -0700 Subject: [PATCH 07/32] Don't re-parse executor tuples It causes a weird hang that blocks observability. Signed-off-by: liamhuber --- pyiron_workflow/node.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 5f064fc3e..30f61dac0 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -391,8 +391,13 @@ def _readiness_error_message(self) -> str: ) def _is_using_wrapped_excutorlib_executor(self) -> bool: - return self.executor is not None and isinstance( - self._parse_executor(self.executor), CacheOverride + return self.executor is not None and ( + isinstance(self.executor, CacheOverride) + or ( + isinstance(self.executor, tuple) + and isinstance(self.executor[0], type) + and issubclass(self.executor[0], CacheOverride) + ) ) def _clean_wrapped_executorlib_executor_cache(self) -> None: From 4ec33bf3ff9cda2020440842119888213bd48b14 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 10 Jul 2025 11:13:56 -0700 Subject: [PATCH 08/32] Exploit lexical path cleaning And make the expected file independently accessible Signed-off-by: liamhuber --- pyiron_workflow/node.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 30f61dac0..b11c9e569 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -401,11 +401,15 @@ def _is_using_wrapped_excutorlib_executor(self) -> bool: ) def _clean_wrapped_executorlib_executor_cache(self) -> None: - location = self.as_path().parent - file_name = self.label + "_o.h5" # Dependent on executorlib implementation - location.joinpath(file_name).unlink() - with contextlib.suppress(OSError): # If it's not empty just move on - location.rmdir() + self._wrapped_executorlib_cache_file.unlink() + self.clean_path() + + @property + def _wrapped_executorlib_cache_file(self) -> Path: + """For internal use to clean up cached executorlib files""" + # Depends on executorlib implementation details not protected by semver + file_name = CacheOverride.override_cache_file_name + "_o.h5" + return self.as_path() / file_name def on_run(self, *args, **kwargs) -> Any: save_result: bool = args[0] From 9d053ffd527e4bbf824cbc8fcb0090db11e58a55 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 10 Jul 2025 11:14:42 -0700 Subject: [PATCH 09/32] Move cache cleaning into the finally method And hide it behind its own boolean flag for testing Signed-off-by: liamhuber --- pyiron_workflow/node.py | 29 +++++++++-------------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index b11c9e569..082c06d66 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -307,6 +307,9 @@ def __init__( # under-development status -- API may change to be more user-friendly self._do_clean: bool = False # Power-user override for cleaning up temporary # serialized results and empty directories (or not). + self._remove_executorlib_cache: bool = True # Power-user override for cleaning + # up temporary serialized results from runs with executorlib; intended to be + # used for testing self._cached_inputs: dict[str, Any] | None = None self._user_data: dict[str, Any] = {} @@ -594,26 +597,6 @@ def _run( finish_run_kwargs=finish_run_kwargs, ) - def _finish_run( - self, - run_output: tuple | Future, - /, - raise_run_exceptions: bool, - run_exception_kwargs: dict, - run_finally_kwargs: dict, - **kwargs, - ) -> Any | tuple | None: - if self._is_using_wrapped_excutorlib_executor(): - self._clean_wrapped_executorlib_executor_cache() - - return super()._finish_run( - run_output, - raise_run_exceptions=raise_run_exceptions, - run_exception_kwargs=run_exception_kwargs, - run_finally_kwargs=run_finally_kwargs, - **kwargs, - ) - def _run_finally(self, /, emit_ran_signal: bool, raise_run_exceptions: bool): super()._run_finally() if self.parent is not None and self.parent.running: @@ -640,6 +623,12 @@ def _run_finally(self, /, emit_ran_signal: bool, raise_run_exceptions: bool): if self._do_clean: self._clean_graph_directory() + if ( + self._remove_executorlib_cache + and self._is_using_wrapped_excutorlib_executor() + ): + self._clean_wrapped_executorlib_executor_cache() + def run_data_tree(self, run_parent_trees_too=False) -> None: """ Use topological analysis to build a tree of all upstream dependencies and run From f7dfc3d372721b5d575ba736446a125ad32be668 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 10 Jul 2025 11:15:20 -0700 Subject: [PATCH 10/32] Update executorlib syntax And make the file name fixed and accessible at the class level Signed-off-by: liamhuber --- pyiron_workflow/executors/wrapped_executorlib.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 116d17714..08660c635 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -1,4 +1,5 @@ import inspect +from typing import ClassVar from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor @@ -6,6 +7,8 @@ class CacheOverride(BaseExecutor): + override_cache_file_name: ClassVar[str] = "executorlib_cache" + def submit(self, fn, /, *args, **kwargs): """ We demand that `fn` be the bound-method `on_run` of a `Lexical`+`Runnable` @@ -18,8 +21,8 @@ class (a `Node` is, of course, the intended resolution of this demand). and isinstance(fn.__self__, run.Runnable) # provides .on_run ): cache_key_info = { - "cache_key": str(fn.__self__.as_path()), - "cache_directory": ".", # Doesn't matter, the path in the key overrides + "cache_key": self.override_cache_file_name, + "cache_directory": str(fn.__self__.as_path()), } else: raise TypeError( From ad3356c7f859de5aaa998286cfa5ee432de34980 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 10 Jul 2025 11:15:31 -0700 Subject: [PATCH 11/32] Test the single node executor Signed-off-by: liamhuber --- tests/integration/test_wrapped_executorlib.py | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 tests/integration/test_wrapped_executorlib.py diff --git a/tests/integration/test_wrapped_executorlib.py b/tests/integration/test_wrapped_executorlib.py new file mode 100644 index 000000000..75817c92b --- /dev/null +++ b/tests/integration/test_wrapped_executorlib.py @@ -0,0 +1,78 @@ +import time +import unittest + +import pyiron_workflow as pwf +from pyiron_workflow.executors.wrapped_executorlib import CacheSingleNodeExecutor + + +class TestWrappedExecutorlib(unittest.TestCase): + + def test_cache(self): + t_sleep = 1 + + wf = pwf.Workflow("passive_run") + wf.n1 = pwf.std.UserInput(t_sleep) + wf.n2 = pwf.std.Sleep(wf.n1) + wf.n3 = pwf.std.UserInput(wf.n2) + expected_output = {"n3__user_input": t_sleep} + + wf.use_cache = False + wf.n2.use_cache = False + wf.n2._remove_executorlib_cache = False + + wf.n2.executor = (CacheSingleNodeExecutor, (), {}) + + t0 = time.perf_counter() + out1 = wf.run() + t1 = time.perf_counter() + out2 = wf.run() + t2 = time.perf_counter() + + self.assertDictEqual( + expected_output, + out1, + msg="Sanity check that the workflow ran ok", + ) + self.assertDictEqual( + expected_output, + out2, + msg="Sanity check that the workflow re-ran ok", + ) + self.assertFalse( + wf.n2.cache_hit, msg="Sanity check that we're not just using the cache" + ) + + t_first_run = t1 - t0 + t_second_run = t2 - t1 + self.assertGreater( + t_first_run, + t_sleep, + msg="The initial run should be executing the sleep node", + ) + self.assertLess( + t_second_run, + (t_first_run) / 10.0, + msg="The second run should allow executorlib to find the cached result, " + "and be much faster than the sleep time", + ) + + self.assertTrue( + wf.n2._wrapped_executorlib_cache_file.is_file(), + msg="Since we deactivated cache removal, we expect the executorlib cache " + "file to still be there", + ) + + wf.n2.running = True # Fake that it's still running + wf.n2._remove_executorlib_cache = True # Reactivate automatic cleanup + out3 = wf.run() + + self.assertDictEqual( + expected_output, + out3, + msg="Workflow should recover from a running child state when the wrapped " + "executorlib executor can find a cached result", + ) + self.assertFalse( + wf.n2._wrapped_executorlib_cache_file.is_file(), + msg="The cached result should be cleaned up", + ) From 154b323b2d979edc81cd3a99aef38aff910c2d4f Mon Sep 17 00:00:00 2001 From: liamhuber Date: Fri, 11 Jul 2025 08:40:19 -0700 Subject: [PATCH 12/32] Clean the associated cache subdirectory The slurm executor populates this with a submission script, etc. Signed-off-by: liamhuber --- pyiron_workflow/node.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 082c06d66..131803acd 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -8,6 +8,7 @@ from __future__ import annotations import contextlib +import shutil from abc import ABC, abstractmethod from concurrent.futures import Future from importlib import import_module @@ -405,6 +406,8 @@ def _is_using_wrapped_excutorlib_executor(self) -> bool: def _clean_wrapped_executorlib_executor_cache(self) -> None: self._wrapped_executorlib_cache_file.unlink() + if Path(CacheOverride.override_cache_file_name).is_dir(): + shutil.rmtree(CacheOverride.override_cache_file_name) self.clean_path() @property From 1c471f76c9b684b73d1af9576a90eafbade29cfb Mon Sep 17 00:00:00 2001 From: liamhuber Date: Fri, 11 Jul 2025 14:05:52 -0700 Subject: [PATCH 13/32] Clean up the slurm stuff too Signed-off-by: liamhuber --- pyiron_workflow/node.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 131803acd..8736205cb 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -8,6 +8,7 @@ from __future__ import annotations import contextlib +import pathlib import shutil from abc import ABC, abstractmethod from concurrent.futures import Future @@ -406,7 +407,7 @@ def _is_using_wrapped_excutorlib_executor(self) -> bool: def _clean_wrapped_executorlib_executor_cache(self) -> None: self._wrapped_executorlib_cache_file.unlink() - if Path(CacheOverride.override_cache_file_name).is_dir(): + if pathlib.Path(CacheOverride.override_cache_file_name).is_dir(): shutil.rmtree(CacheOverride.override_cache_file_name) self.clean_path() From 17e41844a936336014068f8f53fe48dfd4e52ae9 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Fri, 11 Jul 2025 14:13:48 -0700 Subject: [PATCH 14/32] Add local file executor From @jan-janssen in [this comment](https://github.com/pyiron/executorlib/issues/708#issue-3222362792) Co-authored-by: Jan Janssen Signed-off-by: liamhuber --- .../executors/wrapped_executorlib.py | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 08660c635..37607812d 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -55,3 +55,54 @@ class CacheSingleNodeExecutor(SingleNodeExecutor, CacheOverride): ... class CacheSlurmClusterExecutor(SlurmClusterExecutor, CacheOverride): ... + + +from typing import Callable, Optional + +from executorlib.executor.base import BaseExecutor +from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess +from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler + + +class LocalFileExecutor(BaseExecutor): + def __init__( + self, + max_workers: Optional[int] = None, + cache_directory: Optional[str] = None, + max_cores: Optional[int] = None, + resource_dict: Optional[dict] = None, + hostname_localhost: Optional[bool] = None, + block_allocation: bool = False, + init_function: Optional[Callable] = None, + disable_dependencies: bool = False, + refresh_rate: float = 0.01, + ): + default_resource_dict: dict = { + "cores": 1, + "threads_per_core": 1, + "gpus_per_core": 0, + "cwd": None, + "openmpi_oversubscribe": False, + "slurm_cmd_args": [], + } + if cache_directory is None: + default_resource_dict["cache_directory"] = "executorlib_cache" + else: + default_resource_dict["cache_directory"] = cache_directory + if resource_dict is None: + resource_dict = {} + resource_dict.update( + {k: v for k, v in default_resource_dict.items() if k not in resource_dict} + ) + super().__init__( + executor=FileTaskScheduler( + resource_dict=resource_dict, + pysqa_config_directory=None, + backend=None, + disable_dependencies=disable_dependencies, + execute_function=execute_in_subprocess, + ) + ) + + +class CacheLocalFileExecutor(LocalFileExecutor, CacheOverride): ... From b624ef8370bb1e1adc6dc528f8e665731c6e32df Mon Sep 17 00:00:00 2001 From: liamhuber Date: Fri, 11 Jul 2025 14:17:13 -0700 Subject: [PATCH 15/32] lint Signed-off-by: liamhuber --- .../executors/wrapped_executorlib.py | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 37607812d..30856f351 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -1,7 +1,10 @@ import inspect +from collections.abc import Callable from typing import ClassVar from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor +from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess +from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler from pyiron_workflow.mixin import lexical, run @@ -57,23 +60,16 @@ class CacheSingleNodeExecutor(SingleNodeExecutor, CacheOverride): ... class CacheSlurmClusterExecutor(SlurmClusterExecutor, CacheOverride): ... -from typing import Callable, Optional - -from executorlib.executor.base import BaseExecutor -from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess -from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler - - class LocalFileExecutor(BaseExecutor): def __init__( self, - max_workers: Optional[int] = None, - cache_directory: Optional[str] = None, - max_cores: Optional[int] = None, - resource_dict: Optional[dict] = None, - hostname_localhost: Optional[bool] = None, + max_workers: int | None = None, + cache_directory: str | None = None, + max_cores: int | None = None, + resource_dict: dict | None = None, + hostname_localhost: bool | None = None, block_allocation: bool = False, - init_function: Optional[Callable] = None, + init_function: Callable | None = None, disable_dependencies: bool = False, refresh_rate: float = 0.01, ): From 48ecfa789eb56296fb915a70d9549baacf124dec Mon Sep 17 00:00:00 2001 From: liamhuber Date: Fri, 11 Jul 2025 14:17:26 -0700 Subject: [PATCH 16/32] Test local both executors Signed-off-by: liamhuber --- tests/integration/test_wrapped_executorlib.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_wrapped_executorlib.py b/tests/integration/test_wrapped_executorlib.py index 75817c92b..4972cf282 100644 --- a/tests/integration/test_wrapped_executorlib.py +++ b/tests/integration/test_wrapped_executorlib.py @@ -2,12 +2,15 @@ import unittest import pyiron_workflow as pwf -from pyiron_workflow.executors.wrapped_executorlib import CacheSingleNodeExecutor +from pyiron_workflow.executors.wrapped_executorlib import ( + CacheLocalFileExecutor, + CacheSingleNodeExecutor, +) class TestWrappedExecutorlib(unittest.TestCase): - def test_cache(self): + def _test_cache(self, executor_class): t_sleep = 1 wf = pwf.Workflow("passive_run") @@ -20,7 +23,7 @@ def test_cache(self): wf.n2.use_cache = False wf.n2._remove_executorlib_cache = False - wf.n2.executor = (CacheSingleNodeExecutor, (), {}) + wf.n2.executor = (executor_class, (), {}) t0 = time.perf_counter() out1 = wf.run() @@ -76,3 +79,8 @@ def test_cache(self): wf.n2._wrapped_executorlib_cache_file.is_file(), msg="The cached result should be cleaned up", ) + + def test_cache(self): + for executor_class in [CacheSingleNodeExecutor, CacheLocalFileExecutor]: + with self.subTest(executor_class.__name__): + self._test_cache(executor_class) From 2bafc68f7d57539fc7eeb8b1f372d88d85f8c27f Mon Sep 17 00:00:00 2001 From: liamhuber Date: Mon, 14 Jul 2025 16:40:56 -0700 Subject: [PATCH 17/32] Add prefix to cleaning directory Signed-off-by: liamhuber --- pyiron_workflow/node.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index 8736205cb..cfb6dadd3 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -407,8 +407,9 @@ def _is_using_wrapped_excutorlib_executor(self) -> bool: def _clean_wrapped_executorlib_executor_cache(self) -> None: self._wrapped_executorlib_cache_file.unlink() - if pathlib.Path(CacheOverride.override_cache_file_name).is_dir(): - shutil.rmtree(CacheOverride.override_cache_file_name) + cache_subdir = self.as_path() / CacheOverride.override_cache_file_name + if pathlib.Path(cache_subdir).is_dir(): + shutil.rmtree(cache_subdir) self.clean_path() @property From fed80d388c8a653778db6c386079a03d12e8a1d4 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 15 Jul 2025 10:26:16 -0700 Subject: [PATCH 18/32] Use test executor The local file executor got directly included in executorlib as a testing tool. Signed-off-by: liamhuber --- .../executors/wrapped_executorlib.py | 47 +------------------ tests/integration/test_wrapped_executorlib.py | 4 +- 2 files changed, 4 insertions(+), 47 deletions(-) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 30856f351..3258ab16b 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -1,10 +1,8 @@ import inspect -from collections.abc import Callable from typing import ClassVar from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor -from executorlib.task_scheduler.file.subprocess_spawner import execute_in_subprocess -from executorlib.task_scheduler.file.task_scheduler import FileTaskScheduler +from executorlib.api import TestClusterExecutor from pyiron_workflow.mixin import lexical, run @@ -60,45 +58,4 @@ class CacheSingleNodeExecutor(SingleNodeExecutor, CacheOverride): ... class CacheSlurmClusterExecutor(SlurmClusterExecutor, CacheOverride): ... -class LocalFileExecutor(BaseExecutor): - def __init__( - self, - max_workers: int | None = None, - cache_directory: str | None = None, - max_cores: int | None = None, - resource_dict: dict | None = None, - hostname_localhost: bool | None = None, - block_allocation: bool = False, - init_function: Callable | None = None, - disable_dependencies: bool = False, - refresh_rate: float = 0.01, - ): - default_resource_dict: dict = { - "cores": 1, - "threads_per_core": 1, - "gpus_per_core": 0, - "cwd": None, - "openmpi_oversubscribe": False, - "slurm_cmd_args": [], - } - if cache_directory is None: - default_resource_dict["cache_directory"] = "executorlib_cache" - else: - default_resource_dict["cache_directory"] = cache_directory - if resource_dict is None: - resource_dict = {} - resource_dict.update( - {k: v for k, v in default_resource_dict.items() if k not in resource_dict} - ) - super().__init__( - executor=FileTaskScheduler( - resource_dict=resource_dict, - pysqa_config_directory=None, - backend=None, - disable_dependencies=disable_dependencies, - execute_function=execute_in_subprocess, - ) - ) - - -class CacheLocalFileExecutor(LocalFileExecutor, CacheOverride): ... +class _CacheTestClusterExecutor(TestClusterExecutor, CacheOverride): ... diff --git a/tests/integration/test_wrapped_executorlib.py b/tests/integration/test_wrapped_executorlib.py index 4972cf282..7d8230647 100644 --- a/tests/integration/test_wrapped_executorlib.py +++ b/tests/integration/test_wrapped_executorlib.py @@ -3,8 +3,8 @@ import pyiron_workflow as pwf from pyiron_workflow.executors.wrapped_executorlib import ( - CacheLocalFileExecutor, CacheSingleNodeExecutor, + _CacheTestClusterExecutor, ) @@ -81,6 +81,6 @@ def _test_cache(self, executor_class): ) def test_cache(self): - for executor_class in [CacheSingleNodeExecutor, CacheLocalFileExecutor]: + for executor_class in [CacheSingleNodeExecutor, _CacheTestClusterExecutor]: with self.subTest(executor_class.__name__): self._test_cache(executor_class) From 5cd3413d1f7488b50acfd4cd04a8882eae52262e Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 15 Jul 2025 11:08:52 -0700 Subject: [PATCH 19/32] Validate executor at assignment Signed-off-by: liamhuber --- pyiron_workflow/mixin/run.py | 26 +++++++++++++++++++++++--- pyiron_workflow/nodes/composite.py | 2 +- tests/unit/mixin/test_run.py | 4 +--- 3 files changed, 25 insertions(+), 7 deletions(-) diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index 35c16896b..552c5b30f 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -60,7 +60,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.running: bool = False self.failed: bool = False - self.executor: InterpretableAsExecutor | None = None + self._executor: InterpretableAsExecutor | None = None # We call it an executor, but it can also be instructions on making one self.future: None | Future = None self._thread_pool_sleep_time: float = 1e-6 @@ -78,6 +78,26 @@ def run_args(self) -> tuple[tuple, dict]: Any data needed for :meth:`on_run`, will be passed as (*args, **kwargs). """ + @property + def executor(self) -> InterpretableAsExecutor | None: + return self._executor + + @executor.setter + def executor(self, executor: InterpretableAsExecutor | None): + if not ( + isinstance(executor, StdLibExecutor | type(None)) + or ( + callable(executor[0]) + and isinstance(executor[1], tuple) + and isinstance(executor[2], dict) + ) + ): + raise TypeError( + f"Expected an instance of {StdLibExecutor}, or a tuple of such a " + f"class, a tuple of args, and a dict of kwargs -- but got {executor}." + ) + self._executor = executor + def process_run_result(self, run_output: Any) -> Any: """ What to _do_ with the results of :meth:`on_run` once you have them. @@ -367,8 +387,8 @@ def __getstate__(self): # the simple pyiron_workflow.executors.CloudpickleProcessPoolExecutor, but for # the more complex executorlib.Executor we're getting: # TypeError: cannot pickle '_thread.RLock' object - if isinstance(self.executor, StdLibExecutor): - state["executor"] = None + if isinstance(self._executor, StdLibExecutor): + state["_executor"] = None # Don't pass actual executors, they have an unserializable thread lock on them # _but_ if the user is just passing instructions on how to _build_ an executor, # we'll trust that those serialize OK (this way we can, hopefully, eventually diff --git a/pyiron_workflow/nodes/composite.py b/pyiron_workflow/nodes/composite.py index fcd93de79..4d48d73c9 100644 --- a/pyiron_workflow/nodes/composite.py +++ b/pyiron_workflow/nodes/composite.py @@ -281,7 +281,7 @@ def _parse_remotely_executed_self(self, other_self): def _get_state_from_remote_other(self, other_self): state = other_self.__getstate__() - state.pop("executor") # Got overridden to None for __getstate__, so keep local + state.pop("_executor") # Got overridden to None for __getstate__, so keep local state.pop("_parent") # Got overridden to None for __getstate__, so keep local return state diff --git a/tests/unit/mixin/test_run.py b/tests/unit/mixin/test_run.py index bfa32ece4..7457a4e68 100644 --- a/tests/unit/mixin/test_run.py +++ b/tests/unit/mixin/test_run.py @@ -154,9 +154,7 @@ def maybe_get_executor(get_executor): msg="Expected the result, including post-processing 'bar' value", ) - with self.assertRaises( - NotImplementedError, msg="That's not an executor at all" - ): + with self.assertRaises(TypeError, msg="That's not an executor at all"): runnable.executor = 42 runnable.run() From cc5651267ea8ba18f9c5f8b08778fc8236feb6d8 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 15 Jul 2025 11:09:59 -0700 Subject: [PATCH 20/32] Delay executor tuple parsing And always with-execute tuples since there is only ever one instance of this executor. If we have already been assigned an executor _instance_ then we trust the user to be managing its state and submit directly rather than wrapping in a with-clause Signed-off-by: liamhuber --- pyiron_workflow/mixin/run.py | 108 ++++++++++++++++------------------- pyiron_workflow/node.py | 3 - 2 files changed, 48 insertions(+), 63 deletions(-) diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index 552c5b30f..0f2af1e36 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -180,13 +180,8 @@ def _none_to_dict(inp: dict | None) -> dict: if stop_early: return result - executor = ( - None if self.executor is None else self._parse_executor(self.executor) - ) - self.running = True return self._run( - executor=executor, raise_run_exceptions=raise_run_exceptions, run_exception_kwargs=run_exception_kwargs, run_finally_kwargs=run_finally_kwargs, @@ -222,7 +217,6 @@ def _before_run( def _run( self, /, - executor: StdLibExecutor | None, raise_run_exceptions: bool, run_exception_kwargs: dict, run_finally_kwargs: dict, @@ -251,7 +245,7 @@ def _run( f"first positional argument passed to :meth:`on_run`." ) - if executor is None: + if self.executor is None: try: run_output = self.on_run(*on_run_args, **on_run_kwargs) except (Exception, KeyboardInterrupt) as e: @@ -269,25 +263,57 @@ def _run( **finish_run_kwargs, ) else: - if isinstance(executor, ThreadPoolExecutor): - self.future = executor.submit( - self._thread_pool_run, *on_run_args, **on_run_kwargs + if isinstance(self.executor, StdLibExecutor): + self.future = self._send_to_executor( + self.executor, + on_run_args, + on_run_kwargs, + raise_run_exceptions, + run_exception_kwargs, + run_finally_kwargs, + finish_run_kwargs, ) else: - self.future = executor.submit( - self.on_run, *on_run_args, **on_run_kwargs - ) - self.future.add_done_callback( - partial( - self._finish_run, - raise_run_exceptions=raise_run_exceptions, - run_exception_kwargs=run_exception_kwargs, - run_finally_kwargs=run_finally_kwargs, - **finish_run_kwargs, - ) - ) + creator, args, kwargs = self.executor + with creator(*args, **kwargs) as executor: + self.future = self._send_to_executor( + executor, + on_run_args, + on_run_kwargs, + raise_run_exceptions, + run_exception_kwargs, + run_finally_kwargs, + finish_run_kwargs, + ) return self.future + def _send_to_executor( + self, + executor: StdLibExecutor, + on_run_args: tuple, + on_run_kwargs: dict, + raise_run_exceptions: bool, + run_exception_kwargs: dict, + run_finally_kwargs: dict, + finish_run_kwargs: dict, + ) -> Future: + submit_function = ( + self._thread_pool_run + if isinstance(executor, ThreadPoolExecutor) + else self.on_run + ) + future = executor.submit(submit_function, *on_run_args, **on_run_kwargs) + future.add_done_callback( + partial( + self._finish_run, + raise_run_exceptions=raise_run_exceptions, + run_exception_kwargs=run_exception_kwargs, + run_finally_kwargs=run_finally_kwargs, + **finish_run_kwargs, + ) + ) + return future + def _run_exception(self, /, *args, **kwargs): """ What to do if an exception is encountered inside :meth:`_run` or @@ -342,44 +368,6 @@ def _readiness_error_message(self) -> str: f"should be neither running nor failed.\n" + self.readiness_report ) - @staticmethod - def _parse_executor( - executor: InterpretableAsExecutor, - ) -> StdLibExecutor: - """ - If you've already got an executor, you're done. But if you get callable and - some args and kwargs, turn them into an executor! - - This is because executors can't be serialized, but you might want to use an - executor on the far side of serialization. The most straightforward example is - to simply pass an executor class and its args and kwargs, but in a more - sophisticated case perhaps you want some function that accesses the _same_ - executor on multiple invocations such that multiple nodes are sharing the same - executor. The functionality here isn't intended to hold your hand for this, but - should be flexible enough that you _can_ do it if you want to. - """ - if isinstance(executor, StdLibExecutor): - return executor - elif ( - isinstance(executor, tuple) - and callable(executor[0]) - and isinstance(executor[1], tuple) - and isinstance(executor[2], dict) - ): - executor = executor[0](*executor[1], **executor[2]) - if not isinstance(executor, StdLibExecutor): - raise TypeError( - f"Executor parsing got a callable and expected it to return a " - f"`concurrent.futures.Executor` instance, but instead got " - f"{executor}." - ) - return executor - else: - raise NotImplementedError( - f"Expected an instance of {StdLibExecutor}, or a tuple of such a class, " - f"a tuple of args, and a dict of kwargs -- but got {executor}." - ) - def __getstate__(self): state = super().__getstate__() state["future"] = None diff --git a/pyiron_workflow/node.py b/pyiron_workflow/node.py index cfb6dadd3..413d3eb20 100644 --- a/pyiron_workflow/node.py +++ b/pyiron_workflow/node.py @@ -36,7 +36,6 @@ ) if TYPE_CHECKING: - from concurrent.futures import Executor from pathlib import Path import graphviz @@ -586,7 +585,6 @@ def clear_cache(self): def _run( self, - executor: Executor | None, raise_run_exceptions: bool, run_exception_kwargs: dict, run_finally_kwargs: dict, @@ -595,7 +593,6 @@ def _run( if self.parent is not None and self.parent.running: self.parent.register_child_starting(self) return super()._run( - executor=executor, raise_run_exceptions=raise_run_exceptions, run_exception_kwargs=run_exception_kwargs, run_finally_kwargs=run_finally_kwargs, From dfc50af7210bf190e74b98da8b815d279c976b36 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 15 Jul 2025 11:13:39 -0700 Subject: [PATCH 21/32] Decrease improvement expectation Recent changes threw off the balance of times in the first vs second run, so rather compare to what you actually care about: that the second run is bypassing the sleep call. Signed-off-by: liamhuber --- tests/integration/test_wrapped_executorlib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_wrapped_executorlib.py b/tests/integration/test_wrapped_executorlib.py index 7d8230647..65fd01327 100644 --- a/tests/integration/test_wrapped_executorlib.py +++ b/tests/integration/test_wrapped_executorlib.py @@ -54,7 +54,7 @@ def _test_cache(self, executor_class): ) self.assertLess( t_second_run, - (t_first_run) / 10.0, + 0.5 * t_sleep, msg="The second run should allow executorlib to find the cached result, " "and be much faster than the sleep time", ) From 9cd86bb96097a40c5c10331c053f547078179b2a Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 15 Jul 2025 12:03:06 -0700 Subject: [PATCH 22/32] Use explicit shutdown Instead of a with-clause. This way the executor is still permitted to release the thread before the job is done, but we still guarantee that executors created by bespoke instructions get shutdown at the end of their one-future lifetime. Signed-off-by: liamhuber --- pyiron_workflow/mixin/run.py | 86 ++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 44 deletions(-) diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index 0f2af1e36..f91a3c95f 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -264,55 +264,53 @@ def _run( ) else: if isinstance(self.executor, StdLibExecutor): - self.future = self._send_to_executor( - self.executor, - on_run_args, - on_run_kwargs, - raise_run_exceptions, - run_exception_kwargs, - run_finally_kwargs, - finish_run_kwargs, - ) + executor = self.executor + unique_executor = False else: creator, args, kwargs = self.executor - with creator(*args, **kwargs) as executor: - self.future = self._send_to_executor( - executor, - on_run_args, - on_run_kwargs, - raise_run_exceptions, - run_exception_kwargs, - run_finally_kwargs, - finish_run_kwargs, + executor = creator(*args, **kwargs) + if not isinstance(executor, StdLibExecutor): + raise TypeError( + f"Expected an instance of {StdLibExecutor}, but got " + f"{type(executor)} from executor creation instructions " + f"{self.executor}." ) - return self.future + unique_executor = True - def _send_to_executor( - self, - executor: StdLibExecutor, - on_run_args: tuple, - on_run_kwargs: dict, - raise_run_exceptions: bool, - run_exception_kwargs: dict, - run_finally_kwargs: dict, - finish_run_kwargs: dict, - ) -> Future: - submit_function = ( - self._thread_pool_run - if isinstance(executor, ThreadPoolExecutor) - else self.on_run - ) - future = executor.submit(submit_function, *on_run_args, **on_run_kwargs) - future.add_done_callback( - partial( - self._finish_run, - raise_run_exceptions=raise_run_exceptions, - run_exception_kwargs=run_exception_kwargs, - run_finally_kwargs=run_finally_kwargs, - **finish_run_kwargs, + submit_function = ( + self._thread_pool_run + if isinstance(executor, ThreadPoolExecutor) + else self.on_run ) - ) - return future + self.future = executor.submit( + submit_function, *on_run_args, **on_run_kwargs + ) + self.future.add_done_callback( + partial( + self._finish_run, + raise_run_exceptions=raise_run_exceptions, + run_exception_kwargs=run_exception_kwargs, + run_finally_kwargs=run_finally_kwargs, + **finish_run_kwargs, + ) + ) + + if unique_executor: + self.future.add_done_callback( + partial(self._shutdown_executor_callback, executor=executor) + ) + + return self.future + + @staticmethod + def _shutdown_executor_callback( + _future: Future, /, executor: StdLibExecutor + ) -> None: + try: + executor.shutdown() + except RuntimeError as e: + if str(e) != "cannot join current thread": + raise e def _run_exception(self, /, *args, **kwargs): """ From 8b03ca620f9b76fbebd9db6d78498515a4d0631f Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 15 Jul 2025 12:03:18 -0700 Subject: [PATCH 23/32] Clean up written file Signed-off-by: liamhuber --- tests/integration/test_parallel_speedup.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/integration/test_parallel_speedup.py b/tests/integration/test_parallel_speedup.py index 67e229597..489dcba34 100644 --- a/tests/integration/test_parallel_speedup.py +++ b/tests/integration/test_parallel_speedup.py @@ -97,6 +97,8 @@ def test_executor_instructions(self): "constructors should survive (de)serialization", ) + reloaded.delete_storage() + if __name__ == "__main__": unittest.main() From c46b98b193b2f66045faa2de27891833c4479de6 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Tue, 15 Jul 2025 12:19:31 -0700 Subject: [PATCH 24/32] Don't wait There was necessarily only the one future, so don't wait at shutdown. This removes the need for accepting the runtime error and prevents the wrapped executorlib executors from hanging indefinitely. Signed-off-by: liamhuber --- pyiron_workflow/mixin/run.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index f91a3c95f..33efe1ce5 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -306,11 +306,7 @@ def _run( def _shutdown_executor_callback( _future: Future, /, executor: StdLibExecutor ) -> None: - try: - executor.shutdown() - except RuntimeError as e: - if str(e) != "cannot join current thread": - raise e + executor.shutdown(wait=False) def _run_exception(self, /, *args, **kwargs): """ From 4350f57ad79bbf1c9b70621f40defd6b3f780a8c Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 17 Jul 2025 06:19:29 -0700 Subject: [PATCH 25/32] Bump executorlib version Including the lower bound Signed-off-by: liamhuber --- .binder/environment.yml | 2 +- .ci_support/environment.yml | 2 +- .ci_support/lower_bound.yml | 2 +- docs/environment.yml | 2 +- pyproject.toml | 2 +- tests/integration/test_workflow.py | 6 +----- 6 files changed, 6 insertions(+), 10 deletions(-) diff --git a/.binder/environment.yml b/.binder/environment.yml index f94339f93..16a018613 100644 --- a/.binder/environment.yml +++ b/.binder/environment.yml @@ -4,7 +4,7 @@ dependencies: - bagofholding =0.1.2 - bidict =0.23.1 - cloudpickle =3.1.1 -- executorlib =1.5.2 +- executorlib =1.6.0 - graphviz =9.0.0 - pandas =2.3.1 - pint =0.24.4 diff --git a/.ci_support/environment.yml b/.ci_support/environment.yml index f94339f93..16a018613 100644 --- a/.ci_support/environment.yml +++ b/.ci_support/environment.yml @@ -4,7 +4,7 @@ dependencies: - bagofholding =0.1.2 - bidict =0.23.1 - cloudpickle =3.1.1 -- executorlib =1.5.2 +- executorlib =1.6.0 - graphviz =9.0.0 - pandas =2.3.1 - pint =0.24.4 diff --git a/.ci_support/lower_bound.yml b/.ci_support/lower_bound.yml index 15be7d4d4..e843aa2e9 100644 --- a/.ci_support/lower_bound.yml +++ b/.ci_support/lower_bound.yml @@ -6,7 +6,7 @@ dependencies: - bagofholding =0.1.0 - bidict =0.23.1 - cloudpickle =3.0.0 -- executorlib =1.5.0 +- executorlib =1.6.0 - graphviz =9.0.0 - pandas =2.2.2 - pint =0.24 diff --git a/docs/environment.yml b/docs/environment.yml index 2a6acfc50..fe8c3334e 100644 --- a/docs/environment.yml +++ b/docs/environment.yml @@ -10,7 +10,7 @@ dependencies: - bagofholding =0.1.2 - bidict =0.23.1 - cloudpickle =3.1.1 -- executorlib =1.5.2 +- executorlib =1.6.0 - graphviz =9.0.0 - pandas =2.3.1 - pint =0.24.4 diff --git a/pyproject.toml b/pyproject.toml index 649da18a5..fdd3381f8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ "bagofholding==0.1.2", "bidict==0.23.1", "cloudpickle==3.1.1", - "executorlib==1.5.2", + "executorlib==1.6.0", "graphviz==0.21", "pandas==2.3.1", "pint==0.24.4", diff --git a/tests/integration/test_workflow.py b/tests/integration/test_workflow.py index 6a081e319..565d2ed47 100644 --- a/tests/integration/test_workflow.py +++ b/tests/integration/test_workflow.py @@ -160,12 +160,8 @@ def test_executors(self): Workflow.create.ProcessPoolExecutor, Workflow.create.ThreadPoolExecutor, Workflow.create.CloudpickleProcessPoolExecutor, + Workflow.create.executorlib.SingleNodeExecutor, ] - try: - executors.append(Workflow.create.executorlib.SingleNodeExecutor) - except AttributeError: - # executorlib < 0.1 had an Executor with optional backend parameter (defaulting to SingleNodeExecutor) - executors.append(Workflow.create.executorlib.Executor) wf = Workflow("executed") wf.a = Workflow.create.std.UserInput(42) # Regular From f5b78f79831c9648584d953f473d3bea7abf1b22 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 17 Jul 2025 06:36:40 -0700 Subject: [PATCH 26/32] Test application to non-node And debug the error message Signed-off-by: liamhuber --- .../executors/wrapped_executorlib.py | 10 +++++++-- tests/unit/test_wrapped_executorlib.py | 22 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) create mode 100644 tests/unit/test_wrapped_executorlib.py diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 3258ab16b..8e456f1ad 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -7,6 +7,12 @@ from pyiron_workflow.mixin import lexical, run +class DedicatedExecutorError(TypeError): + """ + To raise when you try to use one of these executors outside the context of a node. + """ + + class CacheOverride(BaseExecutor): override_cache_file_name: ClassVar[str] = "executorlib_cache" @@ -26,8 +32,8 @@ class (a `Node` is, of course, the intended resolution of this demand). "cache_directory": str(fn.__self__.as_path()), } else: - raise TypeError( - f"{self.__name__} is only intended to work with the " + raise DedicatedExecutorError( + f"{self.__class__.__name__} is only intended to work with the " f"on_run method of pyiron_workflow.Node objects, but got {fn}" ) diff --git a/tests/unit/test_wrapped_executorlib.py b/tests/unit/test_wrapped_executorlib.py new file mode 100644 index 000000000..5ab522a39 --- /dev/null +++ b/tests/unit/test_wrapped_executorlib.py @@ -0,0 +1,22 @@ +import unittest + +from pyiron_workflow.executors.wrapped_executorlib import ( + CacheSingleNodeExecutor, + DedicatedExecutorError, +) + + +def foo(x): + return x + 1 + + +class TestWrappedExecutorlib(unittest.TestCase): + def test_application_protection(self): + with ( + CacheSingleNodeExecutor() as exe, + self.assertRaises( + DedicatedExecutorError, + msg="These executors are specialized to work with node runs", + ), + ): + exe.submit(foo, 1) From 9c9ac664238dd29bbe94cd8f060a3fd63aa9f884 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 17 Jul 2025 07:16:38 -0700 Subject: [PATCH 27/32] Validate resources at init too Since that's the way users will typically interact with this field. I also had to change the inheritance order to make sure we were dealing with the user-facing executor and not the task scheduler, but this doesn't impact the submit loop. Signed-off-by: liamhuber --- .../executors/wrapped_executorlib.py | 50 ++++++++++++------- tests/unit/test_wrapped_executorlib.py | 23 +++++++++ 2 files changed, 56 insertions(+), 17 deletions(-) diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 8e456f1ad..15293c033 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -1,5 +1,5 @@ import inspect -from typing import ClassVar +from typing import Any, ClassVar from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor from executorlib.api import TestClusterExecutor @@ -13,9 +13,19 @@ class DedicatedExecutorError(TypeError): """ +class ProtectedResourceError(ValueError): + """ + Raise when a user provides executorlib resources that we need to override. + """ + + class CacheOverride(BaseExecutor): override_cache_file_name: ClassVar[str] = "executorlib_cache" + def __init__(self, *args, **kwargs): + _validate_existing_resource_dict(kwargs) + super().__init__(*args, **kwargs) + def submit(self, fn, /, *args, **kwargs): """ We demand that `fn` be the bound-method `on_run` of a `Lexical`+`Runnable` @@ -37,20 +47,9 @@ class (a `Node` is, of course, the intended resolution of this demand). f"on_run method of pyiron_workflow.Node objects, but got {fn}" ) + _validate_existing_resource_dict(kwargs) + if "resource_dict" in kwargs: - if "cache_key" in kwargs["resource_dict"]: - raise ValueError( - f"pyiron_workflow needs the freedom to specify the cache, so the " - f'requested "cache_directory" ' - f"({kwargs['resource_dict']['cache_key']}) would get overwritten." - ) - if "cache_directory" in kwargs["resource_dict"]: - raise ValueError( - f"pyiron_workflow needs the freedom to specify the cache, so the " - f'requested "cache_directory" ' - f"({kwargs['resource_dict']['cache_directory']})would get " - f"overwritten." - ) kwargs["resource_dict"].update(cache_key_info) else: kwargs["resource_dict"] = cache_key_info @@ -58,10 +57,27 @@ class (a `Node` is, of course, the intended resolution of this demand). return super().submit(fn, *args, **kwargs) -class CacheSingleNodeExecutor(SingleNodeExecutor, CacheOverride): ... +def _validate_existing_resource_dict(kwargs: dict[str, Any]): + if "resource_dict" in kwargs: + if "cache_key" in kwargs["resource_dict"]: + raise ProtectedResourceError( + f"pyiron_workflow needs the freedom to specify the cache, so the " + f'requested "cache_directory" ' + f"({kwargs['resource_dict']['cache_key']}) would get overwritten." + ) + if "cache_directory" in kwargs["resource_dict"]: + raise ProtectedResourceError( + f"pyiron_workflow needs the freedom to specify the cache, so the " + f'requested "cache_directory" ' + f"({kwargs['resource_dict']['cache_directory']})would get " + f"overwritten." + ) + + +class CacheSingleNodeExecutor(CacheOverride, SingleNodeExecutor): ... -class CacheSlurmClusterExecutor(SlurmClusterExecutor, CacheOverride): ... +class CacheSlurmClusterExecutor(CacheOverride, SlurmClusterExecutor): ... -class _CacheTestClusterExecutor(TestClusterExecutor, CacheOverride): ... +class _CacheTestClusterExecutor(CacheOverride, TestClusterExecutor): ... diff --git a/tests/unit/test_wrapped_executorlib.py b/tests/unit/test_wrapped_executorlib.py index 5ab522a39..e8e4b520e 100644 --- a/tests/unit/test_wrapped_executorlib.py +++ b/tests/unit/test_wrapped_executorlib.py @@ -3,7 +3,9 @@ from pyiron_workflow.executors.wrapped_executorlib import ( CacheSingleNodeExecutor, DedicatedExecutorError, + ProtectedResourceError, ) +from pyiron_workflow.nodes import standard as std def foo(x): @@ -20,3 +22,24 @@ def test_application_protection(self): ), ): exe.submit(foo, 1) + + def test_resource_protection(self): + protected_resources = ( + {"cache_key": "my_key"}, + {"cache_directory": "my_directory"}, + {"cache_key": "my_key", "cache_directory": "my_directory"}, + ) + for resource_dict in protected_resources: + with ( + self.subTest(msg=f"Init resource dict: {resource_dict}"), + self.assertRaises(ProtectedResourceError), + ): + CacheSingleNodeExecutor(resource_dict=resource_dict) + + with ( + self.subTest(msg=f"Submit resource dict: {resource_dict}"), + CacheSingleNodeExecutor() as exe, + self.assertRaises(ProtectedResourceError), + ): + n = std.UserInput() + exe.submit(n.on_run, 42, resource_dict=resource_dict) From 6c6be03934cdcf26b72ebceb3fb0cf2da407dac9 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 17 Jul 2025 07:25:16 -0700 Subject: [PATCH 28/32] Test file cleaning Signed-off-by: liamhuber --- tests/integration/test_wrapped_executorlib.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/integration/test_wrapped_executorlib.py b/tests/integration/test_wrapped_executorlib.py index 65fd01327..1a93d2b56 100644 --- a/tests/integration/test_wrapped_executorlib.py +++ b/tests/integration/test_wrapped_executorlib.py @@ -84,3 +84,10 @@ def test_cache(self): for executor_class in [CacheSingleNodeExecutor, _CacheTestClusterExecutor]: with self.subTest(executor_class.__name__): self._test_cache(executor_class) + + def test_automatic_cleaning(self): + n = pwf.std.UserInput(1) + with _CacheTestClusterExecutor() as exe: + n.executor = exe + n.run() + self.assertFalse(n._wrapped_executorlib_cache_file.is_file()) From b5096922b8fc8fa3b454eed695584770f2cc2d67 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 17 Jul 2025 07:29:19 -0700 Subject: [PATCH 29/32] Test uninterpretable executor setting Signed-off-by: liamhuber --- pyiron_workflow/mixin/run.py | 5 ++++- tests/unit/mixin/test_run.py | 11 ++++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/pyiron_workflow/mixin/run.py b/pyiron_workflow/mixin/run.py index 33efe1ce5..069443441 100644 --- a/pyiron_workflow/mixin/run.py +++ b/pyiron_workflow/mixin/run.py @@ -30,6 +30,9 @@ class ReadinessError(ValueError): readiness_dict: dict[str, bool] # Detailed information on why it is not ready +class NotInterpretableAsExecutorError(TypeError): ... + + class Runnable(UsesState, HasLabel, HasRun, ABC): """ An abstract class for interfacing with executors, etc. @@ -92,7 +95,7 @@ def executor(self, executor: InterpretableAsExecutor | None): and isinstance(executor[2], dict) ) ): - raise TypeError( + raise NotInterpretableAsExecutorError( f"Expected an instance of {StdLibExecutor}, or a tuple of such a " f"class, a tuple of args, and a dict of kwargs -- but got {executor}." ) diff --git a/tests/unit/mixin/test_run.py b/tests/unit/mixin/test_run.py index 7457a4e68..2e687f6f8 100644 --- a/tests/unit/mixin/test_run.py +++ b/tests/unit/mixin/test_run.py @@ -4,7 +4,11 @@ from pyiron_workflow.executors.cloudpickleprocesspool import ( CloudpickleProcessPoolExecutor, ) -from pyiron_workflow.mixin.run import ReadinessError, Runnable +from pyiron_workflow.mixin.run import ( + NotInterpretableAsExecutorError, + ReadinessError, + Runnable, +) class ConcreteRunnable(Runnable): @@ -166,6 +170,11 @@ def maybe_get_executor(get_executor): runnable.executor = (maybe_get_executor, (False,), {}) runnable.run() + def test_executor_interpretation(self): + runnable = ConcreteRunnable() + with self.assertRaises(NotInterpretableAsExecutorError): + runnable.executor = "This is not an executor!" + if __name__ == "__main__": unittest.main() From b06b914b700ec565266a34a85a2111757b29da16 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 17 Jul 2025 07:31:06 -0700 Subject: [PATCH 30/32] Modify test for coverage So we pass throught the Runnable._shutdown_executor_callback process Signed-off-by: liamhuber --- tests/integration/test_wrapped_executorlib.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_wrapped_executorlib.py b/tests/integration/test_wrapped_executorlib.py index 1a93d2b56..e93253457 100644 --- a/tests/integration/test_wrapped_executorlib.py +++ b/tests/integration/test_wrapped_executorlib.py @@ -87,7 +87,6 @@ def test_cache(self): def test_automatic_cleaning(self): n = pwf.std.UserInput(1) - with _CacheTestClusterExecutor() as exe: - n.executor = exe - n.run() + n.executor = (_CacheTestClusterExecutor, (), {}) + n.run() self.assertFalse(n._wrapped_executorlib_cache_file.is_file()) From 0182d3a917f7d52188fe4270aefc781dd5d1de64 Mon Sep 17 00:00:00 2001 From: liamhuber Date: Thu, 17 Jul 2025 07:39:57 -0700 Subject: [PATCH 31/32] Decrease lower bound Signed-off-by: liamhuber --- .ci_support/lower_bound.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci_support/lower_bound.yml b/.ci_support/lower_bound.yml index e843aa2e9..8807788cb 100644 --- a/.ci_support/lower_bound.yml +++ b/.ci_support/lower_bound.yml @@ -6,7 +6,7 @@ dependencies: - bagofholding =0.1.0 - bidict =0.23.1 - cloudpickle =3.0.0 -- executorlib =1.6.0 +- executorlib =1.5.3 - graphviz =9.0.0 - pandas =2.2.2 - pint =0.24 From f579159e12662845b6934c06bc6fc7d6c6db7f1a Mon Sep 17 00:00:00 2001 From: Liam Huber Date: Fri, 18 Jul 2025 10:53:54 -0700 Subject: [PATCH 32/32] Draft a slurm test (#704) * Test slurm submission Signed-off-by: liamhuber * Don't apply callbacks to cached returns Signed-off-by: liamhuber * Only validate submission-time resources Otherwise we run into trouble where it loads saved executor instructions (that already have what it would use anyhow) Signed-off-by: liamhuber * Mark module Signed-off-by: liamhuber * Test cached result branch Signed-off-by: liamhuber --------- Signed-off-by: liamhuber --- .ci_support/environment-cluster.yml | 5 + .github/workflows/push-pull.yml | 10 ++ .github/workflows/slurm-test.yml | 38 ++++++ .../executors/wrapped_executorlib.py | 4 - pyiron_workflow/workflow.py | 8 +- pyproject.toml | 6 + tests/cluster/__init__.py | 0 tests/cluster/slurm_test.py | 123 ++++++++++++++++++ tests/unit/test_workflow.py | 28 ++++ tests/unit/test_wrapped_executorlib.py | 6 - 10 files changed, 216 insertions(+), 12 deletions(-) create mode 100644 .ci_support/environment-cluster.yml create mode 100644 .github/workflows/slurm-test.yml create mode 100644 tests/cluster/__init__.py create mode 100644 tests/cluster/slurm_test.py diff --git a/.ci_support/environment-cluster.yml b/.ci_support/environment-cluster.yml new file mode 100644 index 000000000..b72979a76 --- /dev/null +++ b/.ci_support/environment-cluster.yml @@ -0,0 +1,5 @@ +channels: +- conda-forge +dependencies: +- pysqa =0.2.7 +- h5py =3.14.0 diff --git a/.github/workflows/push-pull.yml b/.github/workflows/push-pull.yml index ef7f58934..b95957f59 100644 --- a/.github/workflows/push-pull.yml +++ b/.github/workflows/push-pull.yml @@ -26,3 +26,13 @@ jobs: do-codacy: false do-coveralls: false do-mypy: true + + slurm-interruption: + uses: ./.github/workflows/slurm-test.yml + with: + mode: interrupt + + slurm-discovery: + uses: ./.github/workflows/slurm-test.yml + with: + mode: discover diff --git a/.github/workflows/slurm-test.yml b/.github/workflows/slurm-test.yml new file mode 100644 index 000000000..f43a36619 --- /dev/null +++ b/.github/workflows/slurm-test.yml @@ -0,0 +1,38 @@ +# Configure the CI with SLURM, and send a job to the queue via a workflow +# The submission job gets hard-killed, then we follow up by either restarting while the +# slurm job is running (mode = interrupt) or by waiting for it to finish +# (mode = discover) + +name: Slurm Test +on: + workflow_call: + inputs: + mode: + required: true + type: string + +jobs: + slurm_test: + runs-on: ubuntu-latest + services: + mysql: + image: mysql:8.0 + env: + MYSQL_ROOT_PASSWORD: root + ports: + - "8888:3306" + options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3 + steps: + - uses: actions/checkout@v4 + - uses: koesterlab/setup-slurm-action@v1 + timeout-minutes: 5 + - uses: pyiron/actions/cached-miniforge@actions-4.0.8 + with: + python-version: '3.12' + env-files: .ci_support/environment.yml .ci_support/environment-cluster.yml + - name: Test (${{ inputs.mode }}) + shell: bash -l {0} + timeout-minutes: 8 + run: | + python -u tests/cluster/slurm_test.py --submit + python -u tests/cluster/slurm_test.py --${{ inputs.mode }} \ No newline at end of file diff --git a/pyiron_workflow/executors/wrapped_executorlib.py b/pyiron_workflow/executors/wrapped_executorlib.py index 15293c033..2ad69ccb2 100644 --- a/pyiron_workflow/executors/wrapped_executorlib.py +++ b/pyiron_workflow/executors/wrapped_executorlib.py @@ -22,10 +22,6 @@ class ProtectedResourceError(ValueError): class CacheOverride(BaseExecutor): override_cache_file_name: ClassVar[str] = "executorlib_cache" - def __init__(self, *args, **kwargs): - _validate_existing_resource_dict(kwargs) - super().__init__(*args, **kwargs) - def submit(self, fn, /, *args, **kwargs): """ We demand that `fn` be the bound-method `on_run` of a `Lexical`+`Runnable` diff --git a/pyiron_workflow/workflow.py b/pyiron_workflow/workflow.py index 0f1c24000..0208f75a4 100644 --- a/pyiron_workflow/workflow.py +++ b/pyiron_workflow/workflow.py @@ -393,7 +393,7 @@ def run( def run_in_thread( self, *args, check_readiness: bool = True, **kwargs - ) -> futures.Future: + ) -> futures.Future | dict[str, Any]: if self.executor is not None: raise ValueError( f"Workflow {self.label} already has an executor set. Running in a " @@ -401,7 +401,11 @@ def run_in_thread( ) self.executor = (futures.ThreadPoolExecutor, (), {}) f = self.run(*args, check_readiness=check_readiness, **kwargs) - f.add_done_callback(lambda _: setattr(self, "executor", None)) + if isinstance(f, futures.Future): + f.add_done_callback(lambda _: setattr(self, "executor", None)) + else: + # We're hitting a cached result + self.executor = None return f def pull(self, run_parent_trees_too=False, **kwargs): diff --git a/pyproject.toml b/pyproject.toml index fdd3381f8..02d89a165 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -56,6 +56,12 @@ Homepage = "https://github.com/pyiron/pyiron_workflow" Documentation = "https://pyiron-workflow.readthedocs.io" Repository = "https://github.com/pyiron/pyiron_workflow" +[project.optional-dependencies] +cluster = [ + "pysqa==0.2.7", + "h5py==3.14.0", +] + [tool.versioneer] VCS = "git" style = "pep440-pre" diff --git a/tests/cluster/__init__.py b/tests/cluster/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/cluster/slurm_test.py b/tests/cluster/slurm_test.py new file mode 100644 index 000000000..b4cb77ad9 --- /dev/null +++ b/tests/cluster/slurm_test.py @@ -0,0 +1,123 @@ +import argparse +import os +import time + +import pyiron_workflow as pwf +from pyiron_workflow.executors.wrapped_executorlib import CacheSlurmClusterExecutor + +t_overhead = 2 +t_sleep = 10 + + +def state_check(wf, expected_wf, expected_n2, expected_result=pwf.api.NOT_DATA): + wf_running, n_running, outputs = ( + wf.running, + wf.n2.running, + wf.outputs.to_value_dict(), + ) + print( + "wf.running, wf.n2.running, wf.outputs.to_value_dict()", + wf_running, + n_running, + outputs, + ) + assert (wf_running, n_running, outputs["n3__user_input"]) == ( + expected_wf, + expected_n2, + expected_result, + ) + + +def submission(): + submission_template = """\ +#!/bin/bash +#SBATCH --output=time.out +#SBATCH --job-name={{job_name}} +#SBATCH --chdir={{working_directory}} +#SBATCH --get-user-env=L +#SBATCH --cpus-per-task={{cores}} + +{{command}} +""" + resource_dict = {"submission_template": submission_template} + + wf = pwf.Workflow("slurm_test") + wf.n1 = pwf.std.UserInput(t_sleep) + wf.n2 = pwf.std.Sleep(wf.n1) + wf.n3 = pwf.std.UserInput(wf.n2) + + print("submitting") + print(time.time()) + wf.n2.executor = (CacheSlurmClusterExecutor, (), {"resource_dict": resource_dict}) + wf.n2.use_cache = False + out = wf.run_in_thread() + print("run return", out) + state_check(wf, True, True) + print("sleeping", t_overhead + t_sleep / 4) + time.sleep(t_overhead + t_sleep / 4) + print("saving") + state_check(wf, True, True) + wf.save() + print("sleeping", t_sleep / 4) + time.sleep(t_sleep / 4) + print("pre-exit state") + state_check(wf, True, True) + print("hard exit at time", time.time()) + os._exit(0) # Hard exit so that we don't wait for the executor + + +def interruption(): + print("loading at time", time.time()) + wf = pwf.Workflow("slurm_test") + state_check(wf, True, True) + wf.executor = None # https://github.com/pyiron/pyiron_workflow/issues/705 + wf.running = False # https://github.com/pyiron/pyiron_workflow/issues/706 + print("re-running") + out = wf.run_in_thread() + print("run return", out) + state_check(wf, True, True) + print("sleeping", t_overhead + t_sleep) + time.sleep(t_overhead + t_sleep) + state_check(wf, False, False, t_sleep) + wf.delete_storage() + + +def discovery(): + print("loading at time", time.time()) + wf = pwf.Workflow("slurm_test") + state_check(wf, True, True) + wf.executor = None # https://github.com/pyiron/pyiron_workflow/issues/705 + wf.running = False # https://github.com/pyiron/pyiron_workflow/issues/706 + print("sleeping", t_overhead + t_sleep) + time.sleep(t_overhead + t_sleep) + print("re-running") + out = wf.run_in_thread() + print("run return", out) + state_check(wf, True, True) + print("sleeping", t_sleep / 10) + time.sleep(t_sleep / 10) + print("finally") + state_check(wf, False, False, t_sleep) + wf.delete_storage() + + +def main(): + parser = argparse.ArgumentParser(description="Run workflow stages.") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--submit", action="store_true", help="Run submission stage.") + group.add_argument( + "--interrupt", action="store_true", help="Run interruption stage." + ) + group.add_argument("--discover", action="store_true", help="Run discovery stage.") + args = parser.parse_args() + + if args.submit: + submission() + elif args.interrupt: + interruption() + elif args.discover: + discovery() + + +if __name__ == "__main__": + main() diff --git a/tests/unit/test_workflow.py b/tests/unit/test_workflow.py index 8d899c4d6..7437aa912 100644 --- a/tests/unit/test_workflow.py +++ b/tests/unit/test_workflow.py @@ -501,6 +501,34 @@ def test_pickle(self): wf_out, reloaded.outputs.to_value_dict(), msg="Pickling should work" ) + def test_repeated_thread_runs(self): + wf = Workflow("wf") + wf.n = demo_nodes.AddThree(x=0) + f = wf.run_in_thread() + self.assertTrue(wf.running, msg="Sanity check") + + self.assertEqual(f.result(), wf, msg="Background run should complete") + self.assertEqual(wf.n.outputs.add_three.value, 3, msg="Sanity check") + max_waits = 10 + while wf.executor is not None: + sleep(0.1) + max_waits -= 1 + if max_waits == 0: + raise RuntimeError( + "Executor should be gone by now -- we're just trying to buy a " + "smidgen of time for the callback to finish." + ) + self.assertIsNone( + wf.executor, msg="On-the-fly thread executors should be transient" + ) + cached = wf.run_in_thread() + self.assertDictEqual( + cached, + wf.outputs.to_value_dict(), + msg="On cache hits we don't expect a future back", + ) + self.assertIsNone(wf.executor, msg="No new executor should be set") + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/test_wrapped_executorlib.py b/tests/unit/test_wrapped_executorlib.py index e8e4b520e..2afe905b8 100644 --- a/tests/unit/test_wrapped_executorlib.py +++ b/tests/unit/test_wrapped_executorlib.py @@ -30,12 +30,6 @@ def test_resource_protection(self): {"cache_key": "my_key", "cache_directory": "my_directory"}, ) for resource_dict in protected_resources: - with ( - self.subTest(msg=f"Init resource dict: {resource_dict}"), - self.assertRaises(ProtectedResourceError), - ): - CacheSingleNodeExecutor(resource_dict=resource_dict) - with ( self.subTest(msg=f"Submit resource dict: {resource_dict}"), CacheSingleNodeExecutor() as exe,