From 1d89054e7678417cce11251dc2b86699bffcc7bd Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 26 Jun 2025 12:40:06 +0100 Subject: [PATCH 1/3] Use array name for path within store --- cubed/core/ops.py | 28 +++++++++++++++++++------ cubed/core/plan.py | 15 ++++++++----- cubed/primitive/blockwise.py | 4 ++-- cubed/tests/primitive/test_blockwise.py | 10 ++++----- 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/cubed/core/ops.py b/cubed/core/ops.py index 7e6a1763..885c72f9 100644 --- a/cubed/core/ops.py +++ b/cubed/core/ops.py @@ -17,13 +17,14 @@ from cubed.backend_array_api import IS_IMMUTABLE_ARRAY, numpy_array_to_backend_array from cubed.backend_array_api import namespace as nxp from cubed.core.array import CoreArray, check_array_specs, compute, gensym -from cubed.core.plan import Plan, new_temp_path +from cubed.core.plan import Plan, context_dir_path, new_temp_path from cubed.primitive.blockwise import blockwise as primitive_blockwise from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise from cubed.primitive.memory import get_buffer_copies from cubed.primitive.rechunk import rechunk as primitive_rechunk from cubed.spec import spec_from_config from cubed.storage.backend import open_backend_array +from cubed.storage.zarr import lazy_zarr_array from cubed.types import T_RegularChunks, T_Shape from cubed.utils import ( array_memory, @@ -157,6 +158,14 @@ def store(sources: Union["Array", Sequence["Array"]], targets, executor=None, ** for source, target in zip(sources, targets): identity = lambda a: a ind = tuple(range(source.ndim)) + + if target is not None and not isinstance(target, zarr.Array): + target = lazy_zarr_array( + target, + shape=source.shape, + dtype=source.dtype, + chunks=source.chunksize, + ) array = blockwise( identity, ind, @@ -192,6 +201,14 @@ def to_zarr(x: "Array", store, path=None, executor=None, **kwargs): # by map fusion (if it was produced with a blockwise operation). identity = lambda a: a ind = tuple(range(x.ndim)) + if store is not None and not isinstance(store, zarr.Array): + store = lazy_zarr_array( + store, + shape=x.shape, + dtype=x.dtype, + chunks=x.chunksize, + path=path, + ) out = blockwise( identity, ind, @@ -200,7 +217,6 @@ def to_zarr(x: "Array", store, path=None, executor=None, **kwargs): dtype=x.dtype, align_arrays=False, target_store=store, - target_path=path, ) out.compute(executor=executor, _return_in_memory_array=False, **kwargs) @@ -298,7 +314,7 @@ def blockwise( spec = check_array_specs(arrays) buffer_copies = get_buffer_copies(spec) if target_store is None: - target_store = new_temp_path(name=name, spec=spec) + target_store = context_dir_path(spec=spec) op = primitive_blockwise( func, out_ind, @@ -452,14 +468,14 @@ def _general_blockwise( if isinstance(target_stores, list): # multiple outputs name = [gensym() for _ in range(len(target_stores))] target_stores = [ - ts if ts is not None else new_temp_path(name=n, spec=spec) - for n, ts in zip(name, target_stores) + ts if ts is not None else context_dir_path(spec=spec) + for ts in target_stores ] target_names = name else: # single output name = gensym() if target_stores is None: - target_stores = [new_temp_path(name=name, spec=spec)] + target_stores = [context_dir_path(spec=spec)] target_names = [name] op = primitive_general_blockwise( diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 12c0cbdb..8ef02121 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -557,17 +557,22 @@ def arrays_to_plan(*arrays): return plans[0].arrays_to_plan(*arrays) +def context_dir_path(spec=None): + work_dir = spec.work_dir if spec is not None else None + if work_dir is None: + work_dir = tempfile.gettempdir() + context_dir = join_path(work_dir, CONTEXT_ID) + delete_on_exit(context_dir) + return context_dir + + def new_temp_path(name, suffix=".zarr", spec=None): """Return a string path for a temporary file path, which may be local or remote. Note that this function does not create the file or any directories (and they may never be created, if for example the file doesn't need to be materialized). """ - work_dir = spec.work_dir if spec is not None else None - if work_dir is None: - work_dir = tempfile.gettempdir() - context_dir = join_path(work_dir, CONTEXT_ID) - delete_on_exit(context_dir) + context_dir = context_dir_path(spec) return join_path(context_dir, f"{name}{suffix}") diff --git a/cubed/primitive/blockwise.py b/cubed/primitive/blockwise.py index 0ed55b17..354f8f60 100644 --- a/cubed/primitive/blockwise.py +++ b/cubed/primitive/blockwise.py @@ -365,7 +365,7 @@ def general_blockwise( f"All outputs must have matching number of blocks in each dimension. Chunks specified: {chunkss}" ) ta: Union[zarr.Array, LazyZarrArray] - if isinstance(target_store, zarr.Array): + if isinstance(target_store, Union[zarr.Array, LazyZarrArray]): ta = target_store else: ta = lazy_zarr_array( @@ -373,7 +373,7 @@ def general_blockwise( shapes[i], dtype=dtypes[i], chunks=target_chunks_ or chunksize, - path=target_paths[i] if target_paths is not None else None, + path=target_names[i], # use array name for path within store storage_options=storage_options, compressor=compressor, ) diff --git a/cubed/tests/primitive/test_blockwise.py b/cubed/tests/primitive/test_blockwise.py index 758ac475..4f90cd78 100644 --- a/cubed/tests/primitive/test_blockwise.py +++ b/cubed/tests/primitive/test_blockwise.py @@ -67,7 +67,7 @@ def test_blockwise(tmp_path, executor, reserved_mem): execute_pipeline(op.pipeline, executor=executor) - res = open_backend_array(target_store, mode="r") + res = open_backend_array(target_store, mode="r", path="target") assert_array_equal(res[:], np.outer([0, 1, 2], [10, 50, 100])) @@ -132,7 +132,7 @@ def test_blockwise_with_args(tmp_path, executor): execute_pipeline(op.pipeline, executor=executor) - res = open_backend_array(target_store, mode="r") + res = open_backend_array(target_store, mode="r", path="target") assert_array_equal( res[:], np.transpose(np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), axes=(1, 0)) ) @@ -225,7 +225,7 @@ def key_function(out_key): execute_pipeline(op.pipeline, executor=executor) - res = open_backend_array(target_store, mode="r") + res = open_backend_array(target_store, mode="r", path="target") assert_array_equal(res[:], np.arange(20)) @@ -284,10 +284,10 @@ def block_function(out_key): input = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) - res1 = open_backend_array(target_store1, mode="r") + res1 = open_backend_array(target_store1, mode="r", path="target1") assert_array_equal(res1[:], np.sqrt(input)) - res2 = open_backend_array(target_store2, mode="r") + res2 = open_backend_array(target_store2, mode="r", path="target2") assert_array_equal(res2[:], -np.sqrt(input)) From f6909916052fcae9e2877d9666f4bff8bad0adab Mon Sep 17 00:00:00 2001 From: Tom White Date: Thu, 26 Jun 2025 11:50:27 +0100 Subject: [PATCH 2/3] Use array name for path within store for rechunk --- cubed/core/ops.py | 9 ++++----- cubed/primitive/rechunk.py | 20 ++++++++++++-------- cubed/tests/primitive/test_rechunk.py | 8 +++----- 3 files changed, 19 insertions(+), 18 deletions(-) diff --git a/cubed/core/ops.py b/cubed/core/ops.py index 885c72f9..20d9edab 100644 --- a/cubed/core/ops.py +++ b/cubed/core/ops.py @@ -17,7 +17,7 @@ from cubed.backend_array_api import IS_IMMUTABLE_ARRAY, numpy_array_to_backend_array from cubed.backend_array_api import namespace as nxp from cubed.core.array import CoreArray, check_array_specs, compute, gensym -from cubed.core.plan import Plan, context_dir_path, new_temp_path +from cubed.core.plan import Plan, context_dir_path from cubed.primitive.blockwise import blockwise as primitive_blockwise from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise from cubed.primitive.memory import get_buffer_copies @@ -902,18 +902,17 @@ def rechunk(x, chunks, *, target_store=None, min_mem=None, use_new_impl=True): name = gensym() spec = x.spec if target_store is None: - target_store = new_temp_path(name=name, spec=spec) + target_store = context_dir_path(spec=spec) name_int = f"{name}-int" - temp_store = new_temp_path(name=name_int, spec=spec) ops = primitive_rechunk( x._zarray, - source_array_name=name, + source_array_name=x.name, int_array_name=name_int, + target_array_name=name, target_chunks=target_chunks, allowed_mem=spec.allowed_mem, reserved_mem=spec.reserved_mem, target_store=target_store, - temp_store=temp_store, storage_options=spec.storage_options, ) diff --git a/cubed/primitive/rechunk.py b/cubed/primitive/rechunk.py index 1187cd98..96c054ed 100644 --- a/cubed/primitive/rechunk.py +++ b/cubed/primitive/rechunk.py @@ -24,11 +24,11 @@ def rechunk( source: T_ZarrArray, source_array_name: str, int_array_name: str, + target_array_name: str, target_chunks: T_RegularChunks, allowed_mem: int, reserved_mem: int, target_store: T_Store, - temp_store: Optional[T_Store] = None, storage_options: Optional[Dict[str, Any]] = None, ) -> List[PrimitiveOperation]: """Change the chunking of an array, without changing its shape or dtype. @@ -44,8 +44,6 @@ def rechunk( The memory reserved on a worker for non-data use when running a task, in bytes target_store : str Path to output Zarr store. - temp_store : str, optional - Path to temporary store for intermediate data. Returns ------- @@ -61,10 +59,11 @@ def rechunk( read_proxy, int_proxy, write_proxy = _setup_array_rechunk( source_array=source, + int_array_name=int_array_name, + target_array_name=target_array_name, target_chunks=target_chunks, max_mem=rechunker_max_mem, target_store=target_store, - temp_store=temp_store, storage_options=storage_options, ) @@ -118,10 +117,11 @@ def rechunk( # from rechunker, but simpler since it only has to handle Zarr arrays def _setup_array_rechunk( source_array: T_ZarrArray, + int_array_name: str, + target_array_name: str, target_chunks: T_RegularChunks, max_mem: int, target_store: T_Store, - temp_store: Optional[T_Store] = None, storage_options: Optional[Dict[str, Any]] = None, ) -> Tuple[CubedArrayProxy, Optional[CubedArrayProxy], CubedArrayProxy]: shape = source_array.shape @@ -148,6 +148,7 @@ def _setup_array_rechunk( shape, dtype, chunks=target_chunks, + path=target_array_name, # use array name for path within store storage_options=storage_options, ) @@ -155,10 +156,13 @@ def _setup_array_rechunk( int_array = None else: # do intermediate store - if temp_store is None: - raise ValueError("A temporary store location must be provided.") int_array = lazy_zarr_array( - temp_store, shape, dtype, chunks=int_chunks, storage_options=storage_options + target_store, + shape, + dtype, + chunks=int_chunks, + path=int_array_name, # use array name for path within store + storage_options=storage_options, ) read_proxy = CubedArrayProxy(source_array, read_chunks) diff --git a/cubed/tests/primitive/test_rechunk.py b/cubed/tests/primitive/test_rechunk.py index f1b1971b..2894e4c1 100644 --- a/cubed/tests/primitive/test_rechunk.py +++ b/cubed/tests/primitive/test_rechunk.py @@ -70,17 +70,16 @@ def test_rechunk( ): source = zarr.ones(shape, chunks=source_chunks, store=tmp_path / "source.zarr") target_store = tmp_path / "target.zarr" - temp_store = tmp_path / "temp.zarr" ops = rechunk( source, source_array_name="source-array", int_array_name="int-array", + target_array_name="target-array", target_chunks=target_chunks, allowed_mem=allowed_mem, reserved_mem=reserved_mem, target_store=target_store, - temp_store=temp_store, ) assert len(ops) == len(expected_num_tasks) @@ -103,7 +102,7 @@ def test_rechunk( for op in ops: execute_pipeline(op.pipeline, executor=executor) - res = open_backend_array(target_store, mode="r") + res = open_backend_array(target_store, mode="r", path="target-array") assert_array_equal(res[:], np.ones(shape)) assert res.chunks == target_chunks @@ -112,7 +111,6 @@ def test_rechunk_allowed_mem_exceeded(tmp_path): source = zarr.ones((4, 4), chunks=(2, 2), store=tmp_path / "source.zarr") allowed_mem = 16 target_store = tmp_path / "target.zarr" - temp_store = tmp_path / "temp.zarr" # cubed's allowed_mem is reduced by a factor of 4 for rechunker's max_mem from 16 to 4 with pytest.raises( @@ -122,9 +120,9 @@ def test_rechunk_allowed_mem_exceeded(tmp_path): source, source_array_name="source-array", int_array_name="int-array", + target_array_name="target-array", target_chunks=(4, 1), allowed_mem=allowed_mem, reserved_mem=0, target_store=target_store, - temp_store=temp_store, ) From 84a0b89f1840410da809da1917d43c3eeb28f681 Mon Sep 17 00:00:00 2001 From: Tom White Date: Wed, 13 Aug 2025 10:09:34 +0100 Subject: [PATCH 3/3] Fix mypy --- cubed/primitive/blockwise.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cubed/primitive/blockwise.py b/cubed/primitive/blockwise.py index 354f8f60..e010116e 100644 --- a/cubed/primitive/blockwise.py +++ b/cubed/primitive/blockwise.py @@ -365,7 +365,7 @@ def general_blockwise( f"All outputs must have matching number of blocks in each dimension. Chunks specified: {chunkss}" ) ta: Union[zarr.Array, LazyZarrArray] - if isinstance(target_store, Union[zarr.Array, LazyZarrArray]): + if isinstance(target_store, (zarr.Array, LazyZarrArray)): ta = target_store else: ta = lazy_zarr_array(