-
Notifications
You must be signed in to change notification settings - Fork 3
Description
Using pyiron/executorlib#676, I can exploit executorlib
's capability to hard-code the cache key in order to manually control re-accessing cached executorlib
results. This is necessary because the automatically generated cache label is over-specific for our case and fails to find cache hits we want.
However, doing this means we are taking full responsibility for guaranteeing that the existence of a cache file implies that the cache hit is valid. By combining the lexical path of the node using the executor, that node's "running" state, and careful cleaning of retrieved cache results, this should be robustly possible -- it's just a matter of writing the book-keeping.
In the meantime, here is a simple proof-of-concept demonstrating how to wrap executorlib
so that the cache gets leveraged in this way:
import inspect
from executorlib import SingleNodeExecutor
from pyiron_workflow.node import Node
from pyiron_workflow.nodes.composite import Composite
class SingleNodeNodeExecutor(SingleNodeExecutor):
"""A SingleNodeExecutor explicitly designed to work with pyiron_workflow.Node objects"""
def __init__(self, root_node: Composite, *args, **kwargs):
"""Force the location of the root node to be the cache directory."""
if "cache_directory" in kwargs:
raise ValueError(
f"{self.__name__} uses the root node to define the cache directory, but we "
f"additionally received the kwarg cache_directory={kwargs['cache_directory']}"
)
# For the real thing I probably rather want root_node XOR cache_directory
super().__init__(*args, cache_directory=str(root_node.as_path()), **kwargs)
def submit(self, fn, /, *args, **kwargs):
"""We demand that `fn` be the bound-method `on_run` of a `Node` class"""
if (
inspect.ismethod(fn)
and fn.__name__ == "on_run"
and isinstance(fn.__self__, Node)
):
# cache_key_info = {"cache_key": fn.__self__.as_path(root=self.info["cache_directory"])}
cache_key_info = {"cache_key": "_".join(fn.__self__.lexical_path.split(fn.__self__.lexical_delimiter)[2:])}
print(cache_key_info)
else:
raise TypeError(
f"{self.__name__} is only intended to work with the "
f"on_run method of pyiron_workflow.Node objects, but got {fn}"
)
if "resource_dict" in kwargs:
if "cache_key" in kwargs["resource_dict"]:
raise ValueError("pyiron_workflow needs the freedom to specify the cache")
kwargs["resource_dict"].update(cache_key_info)
else:
kwargs["resource_dict"] = cache_key_info
return super().submit(fn, *args, **kwargs)
And here is a usage example:
import pyiron_workflow as pwf
wf = pwf.Workflow("executed")
wf.n1 = pwf.std.UserInput(5)
wf.n2 = pwf.std.Sleep(wf.n1)
wf.n3 = pwf.std.UserInput(wf.n2)
wf.n2.use_cache = False
with SingleNodeNodeExecutor(root_node=wf) as exe:
wf.n2.executor = exe
print(wf())
This doesn't handle the cache cleaning, but gets me the cache file I want in the location I want in such a way that I retrieve the cached result after rebooting the kernel. I need to wire up that additional book-keeping, generalize the wrapper to other executorlib
executors, and test on the cluster with a real slurm executor where I can kill the kernel before the job finishes. Down the road, I'll want to explore nesting the executors too.
@jan-janssen, there is then more work to do on the pyiron_workflow
side, but SingleNodeNodeExecutor
demonstrates the very thin layer of wrapping now necessary to get executorlib
executors caching at pyiron_workflow
-informed locations 🚀 Thanks for the quick modifications to executorlib
to allow this!