Skip to content

Wrap executorlib executors #678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Jul 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
246626d
Wrap executorlib executors
liamhuber Jun 19, 2025
1acffcf
Be kinder to fstrings
liamhuber Jun 19, 2025
afa60b2
Remove prints
liamhuber Jun 19, 2025
b5191ad
Black
liamhuber Jun 19, 2025
4f08434
Bump lower bound of executorlib
liamhuber Jun 19, 2025
e90d55f
Wrap SlurmClusterExecutor
liamhuber Jun 19, 2025
579a157
Merge branch 'main' into executor
liamhuber Jul 9, 2025
a8f8f60
Merge branch 'main' into executor
liamhuber Jul 10, 2025
15cf97a
Don't re-parse executor tuples
liamhuber Jul 10, 2025
4ec33bf
Exploit lexical path cleaning
liamhuber Jul 10, 2025
9d053ff
Move cache cleaning into the finally method
liamhuber Jul 10, 2025
f7dfc3d
Update executorlib syntax
liamhuber Jul 10, 2025
ad3356c
Test the single node executor
liamhuber Jul 10, 2025
154b323
Clean the associated cache subdirectory
liamhuber Jul 11, 2025
1c471f7
Clean up the slurm stuff too
liamhuber Jul 11, 2025
17e4184
Add local file executor
liamhuber Jul 11, 2025
b624ef8
lint
liamhuber Jul 11, 2025
48ecfa7
Test local both executors
liamhuber Jul 11, 2025
e3cf308
Merge branch 'main' into executor
liamhuber Jul 14, 2025
2bafc68
Add prefix to cleaning directory
liamhuber Jul 14, 2025
fed80d3
Use test executor
liamhuber Jul 15, 2025
5cd3413
Validate executor at assignment
liamhuber Jul 15, 2025
cc56512
Delay executor tuple parsing
liamhuber Jul 15, 2025
dfc50af
Decrease improvement expectation
liamhuber Jul 15, 2025
9cd86bb
Use explicit shutdown
liamhuber Jul 15, 2025
8b03ca6
Clean up written file
liamhuber Jul 15, 2025
c46b98b
Don't wait
liamhuber Jul 15, 2025
4350f57
Bump executorlib version
liamhuber Jul 17, 2025
9ec4512
Merge branch 'main' into executor
liamhuber Jul 17, 2025
f5b78f7
Test application to non-node
liamhuber Jul 17, 2025
9c9ac66
Validate resources at init too
liamhuber Jul 17, 2025
6c6be03
Test file cleaning
liamhuber Jul 17, 2025
b509692
Test uninterpretable executor setting
liamhuber Jul 17, 2025
b06b914
Modify test for coverage
liamhuber Jul 17, 2025
0182d3a
Decrease lower bound
liamhuber Jul 17, 2025
f579159
Draft a slurm test (#704)
liamhuber Jul 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .binder/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies:
- bagofholding =0.1.2
- bidict =0.23.1
- cloudpickle =3.1.1
- executorlib =1.5.2
- executorlib =1.6.0
- graphviz =9.0.0
- pandas =2.3.1
- pint =0.24.4
Expand Down
5 changes: 5 additions & 0 deletions .ci_support/environment-cluster.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
channels:
- conda-forge
dependencies:
- pysqa =0.2.7
- h5py =3.14.0
2 changes: 1 addition & 1 deletion .ci_support/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies:
- bagofholding =0.1.2
- bidict =0.23.1
- cloudpickle =3.1.1
- executorlib =1.5.2
- executorlib =1.6.0
- graphviz =9.0.0
- pandas =2.3.1
- pint =0.24.4
Expand Down
2 changes: 1 addition & 1 deletion .ci_support/lower_bound.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ dependencies:
- bagofholding =0.1.0
- bidict =0.23.1
- cloudpickle =3.0.0
- executorlib =0.0.1
- executorlib =1.5.3
- graphviz =9.0.0
- pandas =2.2.2
- pint =0.24
Expand Down
10 changes: 10 additions & 0 deletions .github/workflows/push-pull.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,13 @@ jobs:
do-codacy: false
do-coveralls: false
do-mypy: true

slurm-interruption:
uses: ./.github/workflows/slurm-test.yml
with:
mode: interrupt

slurm-discovery:
uses: ./.github/workflows/slurm-test.yml
with:
mode: discover
38 changes: 38 additions & 0 deletions .github/workflows/slurm-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Configure the CI with SLURM, and send a job to the queue via a workflow
# The submission job gets hard-killed, then we follow up by either restarting while the
# slurm job is running (mode = interrupt) or by waiting for it to finish
# (mode = discover)

name: Slurm Test
on:
workflow_call:
inputs:
mode:
required: true
type: string

jobs:
slurm_test:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:8.0
env:
MYSQL_ROOT_PASSWORD: root
ports:
- "8888:3306"
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=5s --health-retries=3
steps:
- uses: actions/checkout@v4
- uses: koesterlab/setup-slurm-action@v1
timeout-minutes: 5
- uses: pyiron/actions/cached-miniforge@actions-4.0.8
with:
python-version: '3.12'
env-files: .ci_support/environment.yml .ci_support/environment-cluster.yml
- name: Test (${{ inputs.mode }})
shell: bash -l {0}
timeout-minutes: 8
run: |
python -u tests/cluster/slurm_test.py --submit
python -u tests/cluster/slurm_test.py --${{ inputs.mode }}
2 changes: 1 addition & 1 deletion docs/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies:
- bagofholding =0.1.2
- bidict =0.23.1
- cloudpickle =3.1.1
- executorlib =1.5.2
- executorlib =1.6.0
- graphviz =9.0.0
- pandas =2.3.1
- pint =0.24.4
Expand Down
79 changes: 79 additions & 0 deletions pyiron_workflow/executors/wrapped_executorlib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import inspect
from typing import Any, ClassVar

from executorlib import BaseExecutor, SingleNodeExecutor, SlurmClusterExecutor
from executorlib.api import TestClusterExecutor

from pyiron_workflow.mixin import lexical, run


class DedicatedExecutorError(TypeError):
"""
To raise when you try to use one of these executors outside the context of a node.
"""


class ProtectedResourceError(ValueError):
"""
Raise when a user provides executorlib resources that we need to override.
"""


class CacheOverride(BaseExecutor):
override_cache_file_name: ClassVar[str] = "executorlib_cache"

def submit(self, fn, /, *args, **kwargs):
"""
We demand that `fn` be the bound-method `on_run` of a `Lexical`+`Runnable`
class (a `Node` is, of course, the intended resolution of this demand).
"""
if (
inspect.ismethod(fn)
and fn.__name__ == "on_run"
and isinstance(fn.__self__, lexical.Lexical) # provides .as_path
and isinstance(fn.__self__, run.Runnable) # provides .on_run
):
cache_key_info = {
"cache_key": self.override_cache_file_name,
"cache_directory": str(fn.__self__.as_path()),
}
else:
raise DedicatedExecutorError(
f"{self.__class__.__name__} is only intended to work with the "
f"on_run method of pyiron_workflow.Node objects, but got {fn}"
)

_validate_existing_resource_dict(kwargs)

if "resource_dict" in kwargs:
kwargs["resource_dict"].update(cache_key_info)
else:
kwargs["resource_dict"] = cache_key_info

return super().submit(fn, *args, **kwargs)


def _validate_existing_resource_dict(kwargs: dict[str, Any]):
if "resource_dict" in kwargs:
if "cache_key" in kwargs["resource_dict"]:
raise ProtectedResourceError(
f"pyiron_workflow needs the freedom to specify the cache, so the "
f'requested "cache_directory" '
f"({kwargs['resource_dict']['cache_key']}) would get overwritten."
)
if "cache_directory" in kwargs["resource_dict"]:
raise ProtectedResourceError(
f"pyiron_workflow needs the freedom to specify the cache, so the "
f'requested "cache_directory" '
f"({kwargs['resource_dict']['cache_directory']})would get "
f"overwritten."
)


class CacheSingleNodeExecutor(CacheOverride, SingleNodeExecutor): ...


class CacheSlurmClusterExecutor(CacheOverride, SlurmClusterExecutor): ...


class _CacheTestClusterExecutor(CacheOverride, TestClusterExecutor): ...
115 changes: 60 additions & 55 deletions pyiron_workflow/mixin/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class ReadinessError(ValueError):
readiness_dict: dict[str, bool] # Detailed information on why it is not ready


class NotInterpretableAsExecutorError(TypeError): ...


class Runnable(UsesState, HasLabel, HasRun, ABC):
"""
An abstract class for interfacing with executors, etc.
Expand Down Expand Up @@ -60,7 +63,7 @@ def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.running: bool = False
self.failed: bool = False
self.executor: InterpretableAsExecutor | None = None
self._executor: InterpretableAsExecutor | None = None
# We call it an executor, but it can also be instructions on making one
self.future: None | Future = None
self._thread_pool_sleep_time: float = 1e-6
Expand All @@ -78,6 +81,26 @@ def run_args(self) -> tuple[tuple, dict]:
Any data needed for :meth:`on_run`, will be passed as (*args, **kwargs).
"""

@property
def executor(self) -> InterpretableAsExecutor | None:
return self._executor

@executor.setter
def executor(self, executor: InterpretableAsExecutor | None):
if not (
isinstance(executor, StdLibExecutor | type(None))
or (
callable(executor[0])
and isinstance(executor[1], tuple)
and isinstance(executor[2], dict)
)
):
raise NotInterpretableAsExecutorError(
f"Expected an instance of {StdLibExecutor}, or a tuple of such a "
f"class, a tuple of args, and a dict of kwargs -- but got {executor}."
)
self._executor = executor

def process_run_result(self, run_output: Any) -> Any:
"""
What to _do_ with the results of :meth:`on_run` once you have them.
Expand Down Expand Up @@ -160,13 +183,8 @@ def _none_to_dict(inp: dict | None) -> dict:
if stop_early:
return result

executor = (
None if self.executor is None else self._parse_executor(self.executor)
)

self.running = True
return self._run(
executor=executor,
raise_run_exceptions=raise_run_exceptions,
run_exception_kwargs=run_exception_kwargs,
run_finally_kwargs=run_finally_kwargs,
Expand Down Expand Up @@ -202,7 +220,6 @@ def _before_run(
def _run(
self,
/,
executor: StdLibExecutor | None,
raise_run_exceptions: bool,
run_exception_kwargs: dict,
run_finally_kwargs: dict,
Expand Down Expand Up @@ -231,7 +248,7 @@ def _run(
f"first positional argument passed to :meth:`on_run`."
)

if executor is None:
if self.executor is None:
try:
run_output = self.on_run(*on_run_args, **on_run_kwargs)
except (Exception, KeyboardInterrupt) as e:
Expand All @@ -249,14 +266,28 @@ def _run(
**finish_run_kwargs,
)
else:
if isinstance(executor, ThreadPoolExecutor):
self.future = executor.submit(
self._thread_pool_run, *on_run_args, **on_run_kwargs
)
if isinstance(self.executor, StdLibExecutor):
executor = self.executor
unique_executor = False
else:
self.future = executor.submit(
self.on_run, *on_run_args, **on_run_kwargs
)
creator, args, kwargs = self.executor
executor = creator(*args, **kwargs)
if not isinstance(executor, StdLibExecutor):
raise TypeError(
f"Expected an instance of {StdLibExecutor}, but got "
f"{type(executor)} from executor creation instructions "
f"{self.executor}."
)
unique_executor = True

submit_function = (
self._thread_pool_run
if isinstance(executor, ThreadPoolExecutor)
else self.on_run
)
self.future = executor.submit(
submit_function, *on_run_args, **on_run_kwargs
)
self.future.add_done_callback(
partial(
self._finish_run,
Expand All @@ -266,8 +297,20 @@ def _run(
**finish_run_kwargs,
)
)

if unique_executor:
self.future.add_done_callback(
partial(self._shutdown_executor_callback, executor=executor)
)

return self.future

@staticmethod
def _shutdown_executor_callback(
_future: Future, /, executor: StdLibExecutor
) -> None:
executor.shutdown(wait=False)

def _run_exception(self, /, *args, **kwargs):
"""
What to do if an exception is encountered inside :meth:`_run` or
Expand Down Expand Up @@ -322,53 +365,15 @@ def _readiness_error_message(self) -> str:
f"should be neither running nor failed.\n" + self.readiness_report
)

@staticmethod
def _parse_executor(
executor: InterpretableAsExecutor,
) -> StdLibExecutor:
"""
If you've already got an executor, you're done. But if you get callable and
some args and kwargs, turn them into an executor!

This is because executors can't be serialized, but you might want to use an
executor on the far side of serialization. The most straightforward example is
to simply pass an executor class and its args and kwargs, but in a more
sophisticated case perhaps you want some function that accesses the _same_
executor on multiple invocations such that multiple nodes are sharing the same
executor. The functionality here isn't intended to hold your hand for this, but
should be flexible enough that you _can_ do it if you want to.
"""
if isinstance(executor, StdLibExecutor):
return executor
elif (
isinstance(executor, tuple)
and callable(executor[0])
and isinstance(executor[1], tuple)
and isinstance(executor[2], dict)
):
executor = executor[0](*executor[1], **executor[2])
if not isinstance(executor, StdLibExecutor):
raise TypeError(
f"Executor parsing got a callable and expected it to return a "
f"`concurrent.futures.Executor` instance, but instead got "
f"{executor}."
)
return executor
else:
raise NotImplementedError(
f"Expected an instance of {StdLibExecutor}, or a tuple of such a class, "
f"a tuple of args, and a dict of kwargs -- but got {executor}."
)

def __getstate__(self):
state = super().__getstate__()
state["future"] = None
# Don't pass the future -- with the future in the state things work fine for
# the simple pyiron_workflow.executors.CloudpickleProcessPoolExecutor, but for
# the more complex executorlib.Executor we're getting:
# TypeError: cannot pickle '_thread.RLock' object
if isinstance(self.executor, StdLibExecutor):
state["executor"] = None
if isinstance(self._executor, StdLibExecutor):
state["_executor"] = None
# Don't pass actual executors, they have an unserializable thread lock on them
# _but_ if the user is just passing instructions on how to _build_ an executor,
# we'll trust that those serialize OK (this way we can, hopefully, eventually
Expand Down
Loading
Loading