Skip to content

Commit 8e1acd8

Browse files
authored
Wrap executorlib executors (#678)
* Wrap executorlib executors To exploit the new caching interface so that nodes can rely on their running state and lexical path to access previously executed results. Locally with the SingleNodeExecutor everything is looking good, but that doesn't natively support terminating the process that submit the job. I'd like to play around with this using the SlurmClusterExecutor on the cluster before making further changes. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Be kinder to fstrings Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Remove prints Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Black Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Bump lower bound of executorlib Since we now depend explicitly on a new feature Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Wrap SlurmClusterExecutor Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Don't re-parse executor tuples It causes a weird hang that blocks observability. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Exploit lexical path cleaning And make the expected file independently accessible Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Move cache cleaning into the finally method And hide it behind its own boolean flag for testing Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Update executorlib syntax And make the file name fixed and accessible at the class level Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Test the single node executor Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Clean the associated cache subdirectory The slurm executor populates this with a submission script, etc. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Clean up the slurm stuff too Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Add local file executor From @jan-janssen in [this comment](pyiron/executorlib#708 (comment)) Co-authored-by: Jan Janssen Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * lint Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Test local both executors Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Add prefix to cleaning directory Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Use test executor The local file executor got directly included in executorlib as a testing tool. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Validate executor at assignment Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Delay executor tuple parsing And always with-execute tuples since there is only ever one instance of this executor. If we have already been assigned an executor _instance_ then we trust the user to be managing its state and submit directly rather than wrapping in a with-clause Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Decrease improvement expectation Recent changes threw off the balance of times in the first vs second run, so rather compare to what you actually care about: that the second run is bypassing the sleep call. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Use explicit shutdown Instead of a with-clause. This way the executor is still permitted to release the thread before the job is done, but we still guarantee that executors created by bespoke instructions get shutdown at the end of their one-future lifetime. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Clean up written file Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Don't wait There was necessarily only the one future, so don't wait at shutdown. This removes the need for accepting the runtime error and prevents the wrapped executorlib executors from hanging indefinitely. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Bump executorlib version Including the lower bound Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Test application to non-node And debug the error message Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Validate resources at init too Since that's the way users will typically interact with this field. I also had to change the inheritance order to make sure we were dealing with the user-facing executor and not the task scheduler, but this doesn't impact the submit loop. Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Test file cleaning Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Test uninterpretable executor setting Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Modify test for coverage So we pass throught the Runnable._shutdown_executor_callback process Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Decrease lower bound Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Draft a slurm test (#704) * Test slurm submission Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Don't apply callbacks to cached returns Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Only validate submission-time resources Otherwise we run into trouble where it loads saved executor instructions (that already have what it would use anyhow) Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Mark module Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> * Test cached result branch Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> --------- Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com> --------- Signed-off-by: liamhuber <liamhuber@greyhavensolutions.com>
1 parent 4841e34 commit 8e1acd8

21 files changed

+546
-75
lines changed

.binder/environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ dependencies:
44
- bagofholding =0.1.2
55
- bidict =0.23.1
66
- cloudpickle =3.1.1
7-
- executorlib =1.5.2
7+
- executorlib =1.6.0
88
- graphviz =9.0.0
99
- pandas =2.3.1
1010
- pint =0.24.4

.ci_support/environment-cluster.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
channels:
2+
- conda-forge
3+
dependencies:
4+
- pysqa =0.2.7
5+
- h5py =3.14.0

.ci_support/environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ dependencies:
44
- bagofholding =0.1.2
55
- bidict =0.23.1
66
- cloudpickle =3.1.1
7-
- executorlib =1.5.2
7+
- executorlib =1.6.0
88
- graphviz =9.0.0
99
- pandas =2.3.1
1010
- pint =0.24.4

.ci_support/lower_bound.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ dependencies:
66
- bagofholding =0.1.0
77
- bidict =0.23.1
88
- cloudpickle =3.0.0
9-
- executorlib =0.0.1
9+
- executorlib =1.5.3
1010
- graphviz =9.0.0
1111
- pandas =2.2.2
1212
- pint =0.24

.github/workflows/push-pull.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,13 @@ jobs:
2626
do-codacy: false
2727
do-coveralls: false
2828
do-mypy: true
29+
30+
slurm-interruption:
31+
uses: ./.github/workflows/slurm-test.yml
32+
with:
33+
mode: interrupt
34+
35+
slurm-discovery:
36+
uses: ./.github/workflows/slurm-test.yml
37+
with:
38+
mode: discover

.github/workflows/slurm-test.yml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Configure the CI with SLURM, and send a job to the queue via a workflow
2+
# The submission job gets hard-killed, then we follow up by either restarting while the
3+
# slurm job is running (mode = interrupt) or by waiting for it to finish
4+
# (mode = discover)
5+
6+
name: Slurm Test
7+
on:
8+
workflow_call:
9+
inputs:
10+
mode:
11+
required: true
12+
type: string
13+
14+
jobs:
15+
slurm_test:
16+
runs-on: ubuntu-latest
17+
services:
18+
mysql:
19+
image: mysql:8.0
20+
env:
21+
MYSQL_ROOT_PASSWORD: root
22+
ports:
23+
- "8888:3306"
24+
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
25+
steps:
26+
- uses: actions/checkout@v4
27+
- uses: koesterlab/setup-slurm-action@v1
28+
timeout-minutes: 5
29+
- uses: pyiron/actions/cached-miniforge@actions-4.0.8
30+
with:
31+
python-version: '3.12'
32+
env-files: .ci_support/environment.yml .ci_support/environment-cluster.yml
33+
- name: Test (${{ inputs.mode }})
34+
shell: bash -l {0}
35+
timeout-minutes: 8
36+
run: |
37+
python -u tests/cluster/slurm_test.py --submit
38+
python -u tests/cluster/slurm_test.py --${{ inputs.mode }}

docs/environment.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ dependencies:
1010
- bagofholding =0.1.2
1111
- bidict =0.23.1
1212
- cloudpickle =3.1.1
13-
- executorlib =1.5.2
13+
- executorlib =1.6.0
1414
- graphviz =9.0.0
1515
- pandas =2.3.1
1616
- pint =0.24.4
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import inspect
2+
from typing import Any, ClassVar
3+
4+
from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor
5+
from executorlib.api import TestClusterExecutor
6+
7+
from pyiron_workflow.mixin import lexical, run
8+
9+
10+
class DedicatedExecutorError(TypeError):
11+
"""
12+
To raise when you try to use one of these executors outside the context of a node.
13+
"""
14+
15+
16+
class ProtectedResourceError(ValueError):
17+
"""
18+
Raise when a user provides executorlib resources that we need to override.
19+
"""
20+
21+
22+
class CacheOverride(BaseExecutor):
23+
override_cache_file_name: ClassVar[str] = "executorlib_cache"
24+
25+
def submit(self, fn, /, *args, **kwargs):
26+
"""
27+
We demand that `fn` be the bound-method `on_run` of a `Lexical`+`Runnable`
28+
class (a `Node` is, of course, the intended resolution of this demand).
29+
"""
30+
if (
31+
inspect.ismethod(fn)
32+
and fn.__name__ == "on_run"
33+
and isinstance(fn.__self__, lexical.Lexical) # provides .as_path
34+
and isinstance(fn.__self__, run.Runnable) # provides .on_run
35+
):
36+
cache_key_info = {
37+
"cache_key": self.override_cache_file_name,
38+
"cache_directory": str(fn.__self__.as_path()),
39+
}
40+
else:
41+
raise DedicatedExecutorError(
42+
f"{self.__class__.__name__} is only intended to work with the "
43+
f"on_run method of pyiron_workflow.Node objects, but got {fn}"
44+
)
45+
46+
_validate_existing_resource_dict(kwargs)
47+
48+
if "resource_dict" in kwargs:
49+
kwargs["resource_dict"].update(cache_key_info)
50+
else:
51+
kwargs["resource_dict"] = cache_key_info
52+
53+
return super().submit(fn, *args, **kwargs)
54+
55+
56+
def _validate_existing_resource_dict(kwargs: dict[str, Any]):
57+
if "resource_dict" in kwargs:
58+
if "cache_key" in kwargs["resource_dict"]:
59+
raise ProtectedResourceError(
60+
f"pyiron_workflow needs the freedom to specify the cache, so the "
61+
f'requested "cache_directory" '
62+
f"({kwargs['resource_dict']['cache_key']}) would get overwritten."
63+
)
64+
if "cache_directory" in kwargs["resource_dict"]:
65+
raise ProtectedResourceError(
66+
f"pyiron_workflow needs the freedom to specify the cache, so the "
67+
f'requested "cache_directory" '
68+
f"({kwargs['resource_dict']['cache_directory']})would get "
69+
f"overwritten."
70+
)
71+
72+
73+
class CacheSingleNodeExecutor(CacheOverride, SingleNodeExecutor): ...
74+
75+
76+
class CacheSlurmClusterExecutor(CacheOverride, SlurmClusterExecutor): ...
77+
78+
79+
class _CacheTestClusterExecutor(CacheOverride, TestClusterExecutor): ...

pyiron_workflow/mixin/run.py

Lines changed: 60 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ class ReadinessError(ValueError):
3030
readiness_dict: dict[str, bool] # Detailed information on why it is not ready
3131

3232

33+
class NotInterpretableAsExecutorError(TypeError): ...
34+
35+
3336
class Runnable(UsesState, HasLabel, HasRun, ABC):
3437
"""
3538
An abstract class for interfacing with executors, etc.
@@ -60,7 +63,7 @@ def __init__(self, *args, **kwargs) -> None:
6063
super().__init__(*args, **kwargs)
6164
self.running: bool = False
6265
self.failed: bool = False
63-
self.executor: InterpretableAsExecutor | None = None
66+
self._executor: InterpretableAsExecutor | None = None
6467
# We call it an executor, but it can also be instructions on making one
6568
self.future: None | Future = None
6669
self._thread_pool_sleep_time: float = 1e-6
@@ -78,6 +81,26 @@ def run_args(self) -> tuple[tuple, dict]:
7881
Any data needed for :meth:`on_run`, will be passed as (*args, **kwargs).
7982
"""
8083

84+
@property
85+
def executor(self) -> InterpretableAsExecutor | None:
86+
return self._executor
87+
88+
@executor.setter
89+
def executor(self, executor: InterpretableAsExecutor | None):
90+
if not (
91+
isinstance(executor, StdLibExecutor | type(None))
92+
or (
93+
callable(executor[0])
94+
and isinstance(executor[1], tuple)
95+
and isinstance(executor[2], dict)
96+
)
97+
):
98+
raise NotInterpretableAsExecutorError(
99+
f"Expected an instance of {StdLibExecutor}, or a tuple of such a "
100+
f"class, a tuple of args, and a dict of kwargs -- but got {executor}."
101+
)
102+
self._executor = executor
103+
81104
def process_run_result(self, run_output: Any) -> Any:
82105
"""
83106
What to _do_ with the results of :meth:`on_run` once you have them.
@@ -160,13 +183,8 @@ def _none_to_dict(inp: dict | None) -> dict:
160183
if stop_early:
161184
return result
162185

163-
executor = (
164-
None if self.executor is None else self._parse_executor(self.executor)
165-
)
166-
167186
self.running = True
168187
return self._run(
169-
executor=executor,
170188
raise_run_exceptions=raise_run_exceptions,
171189
run_exception_kwargs=run_exception_kwargs,
172190
run_finally_kwargs=run_finally_kwargs,
@@ -202,7 +220,6 @@ def _before_run(
202220
def _run(
203221
self,
204222
/,
205-
executor: StdLibExecutor | None,
206223
raise_run_exceptions: bool,
207224
run_exception_kwargs: dict,
208225
run_finally_kwargs: dict,
@@ -231,7 +248,7 @@ def _run(
231248
f"first positional argument passed to :meth:`on_run`."
232249
)
233250

234-
if executor is None:
251+
if self.executor is None:
235252
try:
236253
run_output = self.on_run(*on_run_args, **on_run_kwargs)
237254
except (Exception, KeyboardInterrupt) as e:
@@ -249,14 +266,28 @@ def _run(
249266
**finish_run_kwargs,
250267
)
251268
else:
252-
if isinstance(executor, ThreadPoolExecutor):
253-
self.future = executor.submit(
254-
self._thread_pool_run, *on_run_args, **on_run_kwargs
255-
)
269+
if isinstance(self.executor, StdLibExecutor):
270+
executor = self.executor
271+
unique_executor = False
256272
else:
257-
self.future = executor.submit(
258-
self.on_run, *on_run_args, **on_run_kwargs
259-
)
273+
creator, args, kwargs = self.executor
274+
executor = creator(*args, **kwargs)
275+
if not isinstance(executor, StdLibExecutor):
276+
raise TypeError(
277+
f"Expected an instance of {StdLibExecutor}, but got "
278+
f"{type(executor)} from executor creation instructions "
279+
f"{self.executor}."
280+
)
281+
unique_executor = True
282+
283+
submit_function = (
284+
self._thread_pool_run
285+
if isinstance(executor, ThreadPoolExecutor)
286+
else self.on_run
287+
)
288+
self.future = executor.submit(
289+
submit_function, *on_run_args, **on_run_kwargs
290+
)
260291
self.future.add_done_callback(
261292
partial(
262293
self._finish_run,
@@ -266,8 +297,20 @@ def _run(
266297
**finish_run_kwargs,
267298
)
268299
)
300+
301+
if unique_executor:
302+
self.future.add_done_callback(
303+
partial(self._shutdown_executor_callback, executor=executor)
304+
)
305+
269306
return self.future
270307

308+
@staticmethod
309+
def _shutdown_executor_callback(
310+
_future: Future, /, executor: StdLibExecutor
311+
) -> None:
312+
executor.shutdown(wait=False)
313+
271314
def _run_exception(self, /, *args, **kwargs):
272315
"""
273316
What to do if an exception is encountered inside :meth:`_run` or
@@ -322,53 +365,15 @@ def _readiness_error_message(self) -> str:
322365
f"should be neither running nor failed.\n" + self.readiness_report
323366
)
324367

325-
@staticmethod
326-
def _parse_executor(
327-
executor: InterpretableAsExecutor,
328-
) -> StdLibExecutor:
329-
"""
330-
If you've already got an executor, you're done. But if you get callable and
331-
some args and kwargs, turn them into an executor!
332-
333-
This is because executors can't be serialized, but you might want to use an
334-
executor on the far side of serialization. The most straightforward example is
335-
to simply pass an executor class and its args and kwargs, but in a more
336-
sophisticated case perhaps you want some function that accesses the _same_
337-
executor on multiple invocations such that multiple nodes are sharing the same
338-
executor. The functionality here isn't intended to hold your hand for this, but
339-
should be flexible enough that you _can_ do it if you want to.
340-
"""
341-
if isinstance(executor, StdLibExecutor):
342-
return executor
343-
elif (
344-
isinstance(executor, tuple)
345-
and callable(executor[0])
346-
and isinstance(executor[1], tuple)
347-
and isinstance(executor[2], dict)
348-
):
349-
executor = executor[0](*executor[1], **executor[2])
350-
if not isinstance(executor, StdLibExecutor):
351-
raise TypeError(
352-
f"Executor parsing got a callable and expected it to return a "
353-
f"`concurrent.futures.Executor` instance, but instead got "
354-
f"{executor}."
355-
)
356-
return executor
357-
else:
358-
raise NotImplementedError(
359-
f"Expected an instance of {StdLibExecutor}, or a tuple of such a class, "
360-
f"a tuple of args, and a dict of kwargs -- but got {executor}."
361-
)
362-
363368
def __getstate__(self):
364369
state = super().__getstate__()
365370
state["future"] = None
366371
# Don't pass the future -- with the future in the state things work fine for
367372
# the simple pyiron_workflow.executors.CloudpickleProcessPoolExecutor, but for
368373
# the more complex executorlib.Executor we're getting:
369374
# TypeError: cannot pickle '_thread.RLock' object
370-
if isinstance(self.executor, StdLibExecutor):
371-
state["executor"] = None
375+
if isinstance(self._executor, StdLibExecutor):
376+
state["_executor"] = None
372377
# Don't pass actual executors, they have an unserializable thread lock on them
373378
# _but_ if the user is just passing instructions on how to _build_ an executor,
374379
# we'll trust that those serialize OK (this way we can, hopefully, eventually

0 commit comments

Comments
 (0)