diff --git a/docs/source/quickstart.rst b/docs/source/quickstart.rst index 448b3101f5..0b1f3e8a68 100644 --- a/docs/source/quickstart.rst +++ b/docs/source/quickstart.rst @@ -1,7 +1,7 @@ Quickstart ========== -KvikIO can be used inplace of Python's built-in `open() `_ function with the caveat that a file is always opened in binary (``"b"``) mode. +KvikIO can be used in place of Python's built-in `open() `_ function with the caveat that a file is always opened in binary (``"b"``) mode. In order to open a file, use KvikIO's filehandle :py:meth:`kvikio.cufile.CuFile`. .. code-block:: python diff --git a/python/kvikio/numcodecs.py b/python/kvikio/numcodecs.py new file mode 100644 index 0000000000..40f62be1de --- /dev/null +++ b/python/kvikio/numcodecs.py @@ -0,0 +1,64 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +""" +This module implements CUDA compression and transformation codecs for Numcodecs. +See +""" + +from __future__ import annotations + +from abc import abstractmethod +from typing import Any, Optional, Union + +import cupy.typing +import numpy.typing +from numcodecs.abc import Codec + +# TODO: replace `ANY` with `collections.abc.Buffer` from PEP-688 +# when it becomes available. +BufferLike = Union[cupy.typing.NDArray, numpy.typing.ArrayLike, Any] + + +class CudaCodec(Codec): + """Abstract base class for CUDA codecs""" + + @abstractmethod + def encode(self, buf: BufferLike) -> cupy.typing.NDArray: + """Encode `buf` using CUDA. + + This method should support both device and host buffers. + + Parameters + ---------- + buf + A numpy array like object such as numpy.ndarray, cupy.ndarray, + or any object exporting a buffer interface. + + Returns + ------- + The compressed buffer wrapped in a CuPy array + """ + + @abstractmethod + def decode(self, buf: BufferLike, out: Optional[BufferLike] = None) -> BufferLike: + """Decode `buf` using CUDA. + + This method should support both device and host buffers. + + Parameters + ---------- + buf + A numpy array like object such as numpy.ndarray, cupy.ndarray, + or any object exporting a buffer interface. + out + A numpy array like object such as numpy.ndarray, cupy.ndarray, + or any object exporting a buffer interface. If provided, this buffer must + be exactly the right size to store the decoded data. + + Returns + ------- + Decoded data, which is either host or device memory based on the type + of `out`. If `out` is None, the type of `buf` determines the return buffer + type. + """ diff --git a/python/kvikio/nvcomp_codec.py b/python/kvikio/nvcomp_codec.py index 7869d8478c..dc60d9c7dc 100644 --- a/python/kvikio/nvcomp_codec.py +++ b/python/kvikio/nvcomp_codec.py @@ -4,13 +4,14 @@ from typing import Any, Mapping, Optional, Sequence import cupy as cp -from numcodecs.abc import Codec +import cupy.typing from numcodecs.compat import ensure_contiguous_ndarray_like from kvikio._lib.libnvcomp_ll import SUPPORTED_ALGORITHMS +from kvikio.numcodecs import BufferLike, CudaCodec -class NvCompBatchCodec(Codec): +class NvCompBatchCodec(CudaCodec): """Codec that uses batch algorithms from nvCOMP library. An algorithm is selected using `algorithm` parameter. @@ -48,21 +49,7 @@ def __init__( # Use default stream, if needed. self._stream = stream if stream is not None else cp.cuda.Stream.ptds - def encode(self, buf): - """Encode data in `buf` using nvCOMP. - - Parameters - ---------- - buf : buffer-like - Data to be encoded. May be any object supporting the new-style - buffer protocol. - - Returns - ------- - enc : buffer-like - Encoded data. May be any object supporting the new-style buffer - protocol. - """ + def encode(self, buf: BufferLike) -> cupy.typing.NDArray: return self.encode_batch([buf])[0] def encode_batch(self, bufs: Sequence[Any]) -> Sequence[Any]: @@ -127,24 +114,7 @@ def encode_batch(self, bufs: Sequence[Any]) -> Sequence[Any]: res.append(comp_chunks[i, : comp_chunk_sizes[i]].tobytes()) return res - def decode(self, buf, out=None): - """Decode data in `buf` using nvCOMP. - - Parameters - ---------- - buf : buffer-like - Encoded data. May be any object supporting the new-style buffer - protocol. - out : buffer-like, optional - Writeable buffer to store decoded data. N.B. if provided, this buffer must - be exactly the right size to store the decoded data. - - Returns - ------- - dec : buffer-like - Decoded data. May be any object supporting the new-style - buffer protocol. - """ + def decode(self, buf: BufferLike, out: Optional[BufferLike] = None) -> BufferLike: return self.decode_batch([buf], [out])[0] def decode_batch( diff --git a/python/kvikio/zarr.py b/python/kvikio/zarr.py index 1c030f96ae..c718c413a7 100644 --- a/python/kvikio/zarr.py +++ b/python/kvikio/zarr.py @@ -9,12 +9,15 @@ from typing import Any, Literal, Mapping, Optional, Sequence, Union import cupy +import cupy.typing import numcodecs import numpy import numpy as np import zarr import zarr.creation +import zarr.errors import zarr.storage +import zarr.util from numcodecs.abc import Codec from numcodecs.compat import ensure_contiguous_ndarray_like from numcodecs.registry import register_codec @@ -24,6 +27,7 @@ import kvikio.nvcomp import kvikio.nvcomp_codec import kvikio.zarr +from kvikio.numcodecs import BufferLike, CudaCodec from kvikio.nvcomp_codec import NvCompBatchCodec MINIMUM_ZARR_VERSION = "2.15" @@ -171,7 +175,7 @@ def getitems( return ret -class NVCompCompressor(Codec): +class NVCompCompressor(CudaCodec): """Abstract base class for nvCOMP compressors The derived classes must set `codec_id` and implement @@ -197,41 +201,11 @@ def get_nvcomp_manager(self) -> kvikio.nvcomp.nvCompManager: """ pass # TODO: cache Manager - def encode(self, buf) -> cupy.ndarray: - """Compress using `get_nvcomp_manager()` - - Parameters - ---------- - buf : buffer-like - The buffer to compress. Accepts both host and device memory. - - Returns - ------- - cupy.ndarray - The compressed buffer wrapped in a CuPy array - """ + def encode(self, buf: BufferLike) -> cupy.typing.NDArray: buf = cupy.asarray(ensure_contiguous_ndarray_like(buf)) return self.get_nvcomp_manager().compress(buf) - def decode(self, buf, out=None): - """Decompress using `get_nvcomp_manager()` - - Parameters - ---------- - buf : buffer-like - The buffer to decompress. Accepts both host and device memory. - out : buffer-like, optional - Writeable buffer to store decoded data. N.B. if provided, this buffer must - be exactly the right size to store the decoded data. Accepts both host and - device memory. - - Returns - ------- - buffer-like - Decompress data, which is either host or device memory based on the type - of `out`. If `out` is None, the type of `buf` determines the return buffer - type. - """ + def decode(self, buf: BufferLike, out: Optional[BufferLike] = None) -> BufferLike: buf = ensure_contiguous_ndarray_like(buf) is_host_buffer = not hasattr(buf, "__cuda_array_interface__") if is_host_buffer: @@ -302,7 +276,7 @@ def get_nvcomp_manager(self): class CompatCompressor: """A pair of compatible compressors one using the CPU and one using the GPU""" - def __init__(self, cpu: Codec, gpu: Codec) -> None: + def __init__(self, cpu: Codec, gpu: CudaCodec) -> None: self.cpu = cpu self.gpu = gpu @@ -359,28 +333,46 @@ def open_cupy_array( if not hasattr(meta_array, "__cuda_array_interface__"): raise ValueError("meta_array must implement __cuda_array_interface__") - if mode in ("r", "r+"): - ret = zarr.open_array( - store=kvikio.zarr.GDSStore(path=store), - mode=mode, - meta_array=meta_array, - **kwargs, - ) - # If we are reading a LZ4-CPU compressed file, we overwrite the metadata - # on-the-fly to make Zarr use LZ4-GPU for both compression and decompression. - compat_lz4 = CompatCompressor.lz4() - if ret.compressor == compat_lz4.cpu: + if mode in ("r", "r+", "a"): + # In order to handle "a", we start by trying to open the file in read mode. + try: ret = zarr.open_array( - store=kvikio.zarr.GDSStore( - path=store, - compressor_config_overwrite=compat_lz4.cpu.get_config(), - decompressor_config_overwrite=compat_lz4.gpu.get_config(), - ), - mode=mode, + store=kvikio.zarr.GDSStore(path=store), + mode="r+", meta_array=meta_array, **kwargs, ) - return ret + except (zarr.errors.ContainsGroupError, zarr.errors.ArrayNotFoundError): + # If we are reading, this is a genuine error. + if mode in ("r", "r+"): + raise + else: + # If we are reading a LZ4-CPU compressed file, we overwrite the + # metadata on-the-fly to make Zarr use LZ4-GPU for both compression + # and decompression. + compat_lz4 = CompatCompressor.lz4() + if ret.compressor == compat_lz4.cpu: + ret = zarr.open_array( + store=kvikio.zarr.GDSStore( + path=store, + compressor_config_overwrite=compat_lz4.cpu.get_config(), + decompressor_config_overwrite=compat_lz4.gpu.get_config(), + ), + mode=mode, + meta_array=meta_array, + **kwargs, + ) + elif not isinstance(ret.compressor, CudaCodec): + raise ValueError( + "The Zarr file was written using a non-CUDA compatible " + f"compressor, {ret.compressor}, please use something " + "like kvikio.zarr.CompatCompressor" + ) + return ret + + # At this point, we known that we are writing a new array + if mode not in ("w", "w-", "a"): + raise ValueError(f"Unknown mode: {mode}") if isinstance(compressor, CompatCompressor): compressor_config_overwrite = compressor.cpu.get_config() diff --git a/python/tests/test_zarr.py b/python/tests/test_zarr.py index f909559eea..33e3b4588f 100644 --- a/python/tests/test_zarr.py +++ b/python/tests/test_zarr.py @@ -212,11 +212,13 @@ def test_compressor_config_overwrite(tmp_path, xp, algo): numpy.testing.assert_array_equal(z[:], range(10)) -def test_open_cupy_array(tmp_path): +@pytest.mark.parametrize("write_mode", ["w", "w-", "a"]) +@pytest.mark.parametrize("read_mode", ["r", "r+", "a"]) +def test_open_cupy_array(tmp_path, write_mode, read_mode): a = cupy.arange(10) z = kvikio_zarr.open_cupy_array( tmp_path, - mode="w", + mode=write_mode, shape=a.shape, dtype=a.dtype, chunks=(2,), @@ -231,7 +233,7 @@ def test_open_cupy_array(tmp_path): z = kvikio_zarr.open_cupy_array( tmp_path, - mode="r", + mode=read_mode, ) assert a.shape == z.shape assert a.dtype == z.dtype @@ -239,9 +241,29 @@ def test_open_cupy_array(tmp_path): assert z.compressor == kvikio_nvcomp_codec.NvCompBatchCodec("lz4") cupy.testing.assert_array_equal(a, z[:]) - z = zarr.open_array(tmp_path, mode="r") + z = zarr.open_array(tmp_path, mode=read_mode) assert a.shape == z.shape assert a.dtype == z.dtype assert isinstance(z[:], numpy.ndarray) assert z.compressor == kvikio_zarr.CompatCompressor.lz4().cpu numpy.testing.assert_array_equal(a.get(), z[:]) + + +@pytest.mark.parametrize("mode", ["r", "r+", "a"]) +def test_open_cupy_array_incompatible_compressor(tmp_path, mode): + zarr.create((10,), store=tmp_path, compressor=numcodecs.Blosc()) + + with pytest.raises(ValueError, match="non-CUDA compatible compressor"): + kvikio_zarr.open_cupy_array(tmp_path, mode=mode) + + +def test_open_cupy_array_unknown_mode(tmp_path): + a = cupy.arange(10) + with pytest.raises(ValueError, match="Unknown mode: x"): + kvikio_zarr.open_cupy_array( + tmp_path, + mode="x", + shape=a.shape, + dtype=a.dtype, + chunks=(2,), + )