Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 25 additions & 10 deletions cubed/core/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
from cubed.backend_array_api import IS_IMMUTABLE_ARRAY, numpy_array_to_backend_array
from cubed.backend_array_api import namespace as nxp
from cubed.core.array import CoreArray, check_array_specs, compute, gensym
from cubed.core.plan import Plan, new_temp_path
from cubed.core.plan import Plan, context_dir_path
from cubed.primitive.blockwise import blockwise as primitive_blockwise
from cubed.primitive.blockwise import general_blockwise as primitive_general_blockwise
from cubed.primitive.memory import get_buffer_copies
from cubed.primitive.rechunk import rechunk as primitive_rechunk
from cubed.spec import spec_from_config
from cubed.storage.backend import open_backend_array
from cubed.storage.zarr import lazy_zarr_array
from cubed.types import T_RegularChunks, T_Shape
from cubed.utils import (
array_memory,
Expand Down Expand Up @@ -157,6 +158,14 @@ def store(sources: Union["Array", Sequence["Array"]], targets, executor=None, **
for source, target in zip(sources, targets):
identity = lambda a: a
ind = tuple(range(source.ndim))

if target is not None and not isinstance(target, zarr.Array):
target = lazy_zarr_array(
target,
shape=source.shape,
dtype=source.dtype,
chunks=source.chunksize,
)
array = blockwise(
identity,
ind,
Expand Down Expand Up @@ -192,6 +201,14 @@ def to_zarr(x: "Array", store, path=None, executor=None, **kwargs):
# by map fusion (if it was produced with a blockwise operation).
identity = lambda a: a
ind = tuple(range(x.ndim))
if store is not None and not isinstance(store, zarr.Array):
store = lazy_zarr_array(
store,
shape=x.shape,
dtype=x.dtype,
chunks=x.chunksize,
path=path,
)
out = blockwise(
identity,
ind,
Expand All @@ -200,7 +217,6 @@ def to_zarr(x: "Array", store, path=None, executor=None, **kwargs):
dtype=x.dtype,
align_arrays=False,
target_store=store,
target_path=path,
)
out.compute(executor=executor, _return_in_memory_array=False, **kwargs)

Expand Down Expand Up @@ -298,7 +314,7 @@ def blockwise(
spec = check_array_specs(arrays)
buffer_copies = get_buffer_copies(spec)
if target_store is None:
target_store = new_temp_path(name=name, spec=spec)
target_store = context_dir_path(spec=spec)
op = primitive_blockwise(
func,
out_ind,
Expand Down Expand Up @@ -452,14 +468,14 @@ def _general_blockwise(
if isinstance(target_stores, list): # multiple outputs
name = [gensym() for _ in range(len(target_stores))]
target_stores = [
ts if ts is not None else new_temp_path(name=n, spec=spec)
for n, ts in zip(name, target_stores)
ts if ts is not None else context_dir_path(spec=spec)
for ts in target_stores
]
target_names = name
else: # single output
name = gensym()
if target_stores is None:
target_stores = [new_temp_path(name=name, spec=spec)]
target_stores = [context_dir_path(spec=spec)]
target_names = [name]

op = primitive_general_blockwise(
Expand Down Expand Up @@ -886,18 +902,17 @@ def rechunk(x, chunks, *, target_store=None, min_mem=None, use_new_impl=True):
name = gensym()
spec = x.spec
if target_store is None:
target_store = new_temp_path(name=name, spec=spec)
target_store = context_dir_path(spec=spec)
name_int = f"{name}-int"
temp_store = new_temp_path(name=name_int, spec=spec)
ops = primitive_rechunk(
x._zarray,
source_array_name=name,
source_array_name=x.name,
int_array_name=name_int,
target_array_name=name,
target_chunks=target_chunks,
allowed_mem=spec.allowed_mem,
reserved_mem=spec.reserved_mem,
target_store=target_store,
temp_store=temp_store,
storage_options=spec.storage_options,
)

Expand Down
15 changes: 10 additions & 5 deletions cubed/core/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,17 +557,22 @@ def arrays_to_plan(*arrays):
return plans[0].arrays_to_plan(*arrays)


def context_dir_path(spec=None):
work_dir = spec.work_dir if spec is not None else None
if work_dir is None:
work_dir = tempfile.gettempdir()
context_dir = join_path(work_dir, CONTEXT_ID)
delete_on_exit(context_dir)
return context_dir


def new_temp_path(name, suffix=".zarr", spec=None):
"""Return a string path for a temporary file path, which may be local or remote.

Note that this function does not create the file or any directories (and they
may never be created, if for example the file doesn't need to be materialized).
"""
work_dir = spec.work_dir if spec is not None else None
if work_dir is None:
work_dir = tempfile.gettempdir()
context_dir = join_path(work_dir, CONTEXT_ID)
delete_on_exit(context_dir)
context_dir = context_dir_path(spec)
return join_path(context_dir, f"{name}{suffix}")


Expand Down
4 changes: 2 additions & 2 deletions cubed/primitive/blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ def general_blockwise(
f"All outputs must have matching number of blocks in each dimension. Chunks specified: {chunkss}"
)
ta: Union[zarr.Array, LazyZarrArray]
if isinstance(target_store, zarr.Array):
if isinstance(target_store, (zarr.Array, LazyZarrArray)):
ta = target_store
else:
ta = lazy_zarr_array(
target_store,
shapes[i],
dtype=dtypes[i],
chunks=target_chunks_ or chunksize,
path=target_paths[i] if target_paths is not None else None,
path=target_names[i], # use array name for path within store
storage_options=storage_options,
compressor=compressor,
)
Expand Down
20 changes: 12 additions & 8 deletions cubed/primitive/rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ def rechunk(
source: T_ZarrArray,
source_array_name: str,
int_array_name: str,
target_array_name: str,
target_chunks: T_RegularChunks,
allowed_mem: int,
reserved_mem: int,
target_store: T_Store,
temp_store: Optional[T_Store] = None,
storage_options: Optional[Dict[str, Any]] = None,
) -> List[PrimitiveOperation]:
"""Change the chunking of an array, without changing its shape or dtype.
Expand All @@ -44,8 +44,6 @@ def rechunk(
The memory reserved on a worker for non-data use when running a task, in bytes
target_store : str
Path to output Zarr store.
temp_store : str, optional
Path to temporary store for intermediate data.

Returns
-------
Expand All @@ -61,10 +59,11 @@ def rechunk(

read_proxy, int_proxy, write_proxy = _setup_array_rechunk(
source_array=source,
int_array_name=int_array_name,
target_array_name=target_array_name,
target_chunks=target_chunks,
max_mem=rechunker_max_mem,
target_store=target_store,
temp_store=temp_store,
storage_options=storage_options,
)

Expand Down Expand Up @@ -118,10 +117,11 @@ def rechunk(
# from rechunker, but simpler since it only has to handle Zarr arrays
def _setup_array_rechunk(
source_array: T_ZarrArray,
int_array_name: str,
target_array_name: str,
target_chunks: T_RegularChunks,
max_mem: int,
target_store: T_Store,
temp_store: Optional[T_Store] = None,
storage_options: Optional[Dict[str, Any]] = None,
) -> Tuple[CubedArrayProxy, Optional[CubedArrayProxy], CubedArrayProxy]:
shape = source_array.shape
Expand All @@ -148,17 +148,21 @@ def _setup_array_rechunk(
shape,
dtype,
chunks=target_chunks,
path=target_array_name, # use array name for path within store
storage_options=storage_options,
)

if read_chunks == write_chunks:
int_array = None
else:
# do intermediate store
if temp_store is None:
raise ValueError("A temporary store location must be provided.")
int_array = lazy_zarr_array(
temp_store, shape, dtype, chunks=int_chunks, storage_options=storage_options
target_store,
shape,
dtype,
chunks=int_chunks,
path=int_array_name, # use array name for path within store
storage_options=storage_options,
)

read_proxy = CubedArrayProxy(source_array, read_chunks)
Expand Down
10 changes: 5 additions & 5 deletions cubed/tests/primitive/test_blockwise.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def test_blockwise(tmp_path, executor, reserved_mem):

execute_pipeline(op.pipeline, executor=executor)

res = open_backend_array(target_store, mode="r")
res = open_backend_array(target_store, mode="r", path="target")
assert_array_equal(res[:], np.outer([0, 1, 2], [10, 50, 100]))


Expand Down Expand Up @@ -132,7 +132,7 @@ def test_blockwise_with_args(tmp_path, executor):

execute_pipeline(op.pipeline, executor=executor)

res = open_backend_array(target_store, mode="r")
res = open_backend_array(target_store, mode="r", path="target")
assert_array_equal(
res[:], np.transpose(np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]), axes=(1, 0))
)
Expand Down Expand Up @@ -225,7 +225,7 @@ def key_function(out_key):

execute_pipeline(op.pipeline, executor=executor)

res = open_backend_array(target_store, mode="r")
res = open_backend_array(target_store, mode="r", path="target")
assert_array_equal(res[:], np.arange(20))


Expand Down Expand Up @@ -284,10 +284,10 @@ def block_function(out_key):

input = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])

res1 = open_backend_array(target_store1, mode="r")
res1 = open_backend_array(target_store1, mode="r", path="target1")
assert_array_equal(res1[:], np.sqrt(input))

res2 = open_backend_array(target_store2, mode="r")
res2 = open_backend_array(target_store2, mode="r", path="target2")
assert_array_equal(res2[:], -np.sqrt(input))


Expand Down
8 changes: 3 additions & 5 deletions cubed/tests/primitive/test_rechunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,16 @@ def test_rechunk(
):
source = zarr.ones(shape, chunks=source_chunks, store=tmp_path / "source.zarr")
target_store = tmp_path / "target.zarr"
temp_store = tmp_path / "temp.zarr"

ops = rechunk(
source,
source_array_name="source-array",
int_array_name="int-array",
target_array_name="target-array",
target_chunks=target_chunks,
allowed_mem=allowed_mem,
reserved_mem=reserved_mem,
target_store=target_store,
temp_store=temp_store,
)

assert len(ops) == len(expected_num_tasks)
Expand All @@ -103,7 +102,7 @@ def test_rechunk(
for op in ops:
execute_pipeline(op.pipeline, executor=executor)

res = open_backend_array(target_store, mode="r")
res = open_backend_array(target_store, mode="r", path="target-array")
assert_array_equal(res[:], np.ones(shape))
assert res.chunks == target_chunks

Expand All @@ -112,7 +111,6 @@ def test_rechunk_allowed_mem_exceeded(tmp_path):
source = zarr.ones((4, 4), chunks=(2, 2), store=tmp_path / "source.zarr")
allowed_mem = 16
target_store = tmp_path / "target.zarr"
temp_store = tmp_path / "temp.zarr"

# cubed's allowed_mem is reduced by a factor of 4 for rechunker's max_mem from 16 to 4
with pytest.raises(
Expand All @@ -122,9 +120,9 @@ def test_rechunk_allowed_mem_exceeded(tmp_path):
source,
source_array_name="source-array",
int_array_name="int-array",
target_array_name="target-array",
target_chunks=(4, 1),
allowed_mem=allowed_mem,
reserved_mem=0,
target_store=target_store,
temp_store=temp_store,
)
Loading