Skip to content

Commit

Permalink
Unify the CUDA Codecs (#298)
Browse files Browse the repository at this point in the history
It this PR we introduce `CudaCodec`, which is a base class for all CUDA Condecs/Compressors.
This makes it possible to detect if an user tries to open a Zarr file using an incompatible compressor (see #297). 

Additionally, `kvikio.zarr.open_cupy_array()` now handles `mode="a"`

Authors:
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #298
  • Loading branch information
madsbk authored Oct 23, 2023
1 parent 822a944 commit c11f774
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 92 deletions.
2 changes: 1 addition & 1 deletion docs/source/quickstart.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Quickstart
==========

KvikIO can be used inplace of Python's built-in `open() <https://docs.python.org/3/library/functions.html#open>`_ function with the caveat that a file is always opened in binary (``"b"``) mode.
KvikIO can be used in place of Python's built-in `open() <https://docs.python.org/3/library/functions.html#open>`_ function with the caveat that a file is always opened in binary (``"b"``) mode.
In order to open a file, use KvikIO's filehandle :py:meth:`kvikio.cufile.CuFile`.

.. code-block:: python
Expand Down
64 changes: 64 additions & 0 deletions python/kvikio/numcodecs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

"""
This module implements CUDA compression and transformation codecs for Numcodecs.
See <https://numcodecs.readthedocs.io/en/stable/>
"""

from __future__ import annotations

from abc import abstractmethod
from typing import Any, Optional, Union

import cupy.typing
import numpy.typing
from numcodecs.abc import Codec

# TODO: replace `ANY` with `collections.abc.Buffer` from PEP-688
# when it becomes available.
BufferLike = Union[cupy.typing.NDArray, numpy.typing.ArrayLike, Any]


class CudaCodec(Codec):
"""Abstract base class for CUDA codecs"""

@abstractmethod
def encode(self, buf: BufferLike) -> cupy.typing.NDArray:
"""Encode `buf` using CUDA.
This method should support both device and host buffers.
Parameters
----------
buf
A numpy array like object such as numpy.ndarray, cupy.ndarray,
or any object exporting a buffer interface.
Returns
-------
The compressed buffer wrapped in a CuPy array
"""

@abstractmethod
def decode(self, buf: BufferLike, out: Optional[BufferLike] = None) -> BufferLike:
"""Decode `buf` using CUDA.
This method should support both device and host buffers.
Parameters
----------
buf
A numpy array like object such as numpy.ndarray, cupy.ndarray,
or any object exporting a buffer interface.
out
A numpy array like object such as numpy.ndarray, cupy.ndarray,
or any object exporting a buffer interface. If provided, this buffer must
be exactly the right size to store the decoded data.
Returns
-------
Decoded data, which is either host or device memory based on the type
of `out`. If `out` is None, the type of `buf` determines the return buffer
type.
"""
40 changes: 5 additions & 35 deletions python/kvikio/nvcomp_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
from typing import Any, Mapping, Optional, Sequence

import cupy as cp
from numcodecs.abc import Codec
import cupy.typing
from numcodecs.compat import ensure_contiguous_ndarray_like

from kvikio._lib.libnvcomp_ll import SUPPORTED_ALGORITHMS
from kvikio.numcodecs import BufferLike, CudaCodec


class NvCompBatchCodec(Codec):
class NvCompBatchCodec(CudaCodec):
"""Codec that uses batch algorithms from nvCOMP library.
An algorithm is selected using `algorithm` parameter.
Expand Down Expand Up @@ -48,21 +49,7 @@ def __init__(
# Use default stream, if needed.
self._stream = stream if stream is not None else cp.cuda.Stream.ptds

def encode(self, buf):
"""Encode data in `buf` using nvCOMP.
Parameters
----------
buf : buffer-like
Data to be encoded. May be any object supporting the new-style
buffer protocol.
Returns
-------
enc : buffer-like
Encoded data. May be any object supporting the new-style buffer
protocol.
"""
def encode(self, buf: BufferLike) -> cupy.typing.NDArray:
return self.encode_batch([buf])[0]

def encode_batch(self, bufs: Sequence[Any]) -> Sequence[Any]:
Expand Down Expand Up @@ -127,24 +114,7 @@ def encode_batch(self, bufs: Sequence[Any]) -> Sequence[Any]:
res.append(comp_chunks[i, : comp_chunk_sizes[i]].tobytes())
return res

def decode(self, buf, out=None):
"""Decode data in `buf` using nvCOMP.
Parameters
----------
buf : buffer-like
Encoded data. May be any object supporting the new-style buffer
protocol.
out : buffer-like, optional
Writeable buffer to store decoded data. N.B. if provided, this buffer must
be exactly the right size to store the decoded data.
Returns
-------
dec : buffer-like
Decoded data. May be any object supporting the new-style
buffer protocol.
"""
def decode(self, buf: BufferLike, out: Optional[BufferLike] = None) -> BufferLike:
return self.decode_batch([buf], [out])[0]

def decode_batch(
Expand Down
96 changes: 44 additions & 52 deletions python/kvikio/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@
from typing import Any, Literal, Mapping, Optional, Sequence, Union

import cupy
import cupy.typing
import numcodecs
import numpy
import numpy as np
import zarr
import zarr.creation
import zarr.errors
import zarr.storage
import zarr.util
from numcodecs.abc import Codec
from numcodecs.compat import ensure_contiguous_ndarray_like
from numcodecs.registry import register_codec
Expand All @@ -24,6 +27,7 @@
import kvikio.nvcomp
import kvikio.nvcomp_codec
import kvikio.zarr
from kvikio.numcodecs import BufferLike, CudaCodec
from kvikio.nvcomp_codec import NvCompBatchCodec

MINIMUM_ZARR_VERSION = "2.15"
Expand Down Expand Up @@ -171,7 +175,7 @@ def getitems(
return ret


class NVCompCompressor(Codec):
class NVCompCompressor(CudaCodec):
"""Abstract base class for nvCOMP compressors
The derived classes must set `codec_id` and implement
Expand All @@ -197,41 +201,11 @@ def get_nvcomp_manager(self) -> kvikio.nvcomp.nvCompManager:
"""
pass # TODO: cache Manager

def encode(self, buf) -> cupy.ndarray:
"""Compress using `get_nvcomp_manager()`
Parameters
----------
buf : buffer-like
The buffer to compress. Accepts both host and device memory.
Returns
-------
cupy.ndarray
The compressed buffer wrapped in a CuPy array
"""
def encode(self, buf: BufferLike) -> cupy.typing.NDArray:
buf = cupy.asarray(ensure_contiguous_ndarray_like(buf))
return self.get_nvcomp_manager().compress(buf)

def decode(self, buf, out=None):
"""Decompress using `get_nvcomp_manager()`
Parameters
----------
buf : buffer-like
The buffer to decompress. Accepts both host and device memory.
out : buffer-like, optional
Writeable buffer to store decoded data. N.B. if provided, this buffer must
be exactly the right size to store the decoded data. Accepts both host and
device memory.
Returns
-------
buffer-like
Decompress data, which is either host or device memory based on the type
of `out`. If `out` is None, the type of `buf` determines the return buffer
type.
"""
def decode(self, buf: BufferLike, out: Optional[BufferLike] = None) -> BufferLike:
buf = ensure_contiguous_ndarray_like(buf)
is_host_buffer = not hasattr(buf, "__cuda_array_interface__")
if is_host_buffer:
Expand Down Expand Up @@ -302,7 +276,7 @@ def get_nvcomp_manager(self):
class CompatCompressor:
"""A pair of compatible compressors one using the CPU and one using the GPU"""

def __init__(self, cpu: Codec, gpu: Codec) -> None:
def __init__(self, cpu: Codec, gpu: CudaCodec) -> None:
self.cpu = cpu
self.gpu = gpu

Expand Down Expand Up @@ -359,28 +333,46 @@ def open_cupy_array(
if not hasattr(meta_array, "__cuda_array_interface__"):
raise ValueError("meta_array must implement __cuda_array_interface__")

if mode in ("r", "r+"):
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(path=store),
mode=mode,
meta_array=meta_array,
**kwargs,
)
# If we are reading a LZ4-CPU compressed file, we overwrite the metadata
# on-the-fly to make Zarr use LZ4-GPU for both compression and decompression.
compat_lz4 = CompatCompressor.lz4()
if ret.compressor == compat_lz4.cpu:
if mode in ("r", "r+", "a"):
# In order to handle "a", we start by trying to open the file in read mode.
try:
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=compat_lz4.cpu.get_config(),
decompressor_config_overwrite=compat_lz4.gpu.get_config(),
),
mode=mode,
store=kvikio.zarr.GDSStore(path=store),
mode="r+",
meta_array=meta_array,
**kwargs,
)
return ret
except (zarr.errors.ContainsGroupError, zarr.errors.ArrayNotFoundError):
# If we are reading, this is a genuine error.
if mode in ("r", "r+"):
raise
else:
# If we are reading a LZ4-CPU compressed file, we overwrite the
# metadata on-the-fly to make Zarr use LZ4-GPU for both compression
# and decompression.
compat_lz4 = CompatCompressor.lz4()
if ret.compressor == compat_lz4.cpu:
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=compat_lz4.cpu.get_config(),
decompressor_config_overwrite=compat_lz4.gpu.get_config(),
),
mode=mode,
meta_array=meta_array,
**kwargs,
)
elif not isinstance(ret.compressor, CudaCodec):
raise ValueError(
"The Zarr file was written using a non-CUDA compatible "
f"compressor, {ret.compressor}, please use something "
"like kvikio.zarr.CompatCompressor"
)
return ret

# At this point, we known that we are writing a new array
if mode not in ("w", "w-", "a"):
raise ValueError(f"Unknown mode: {mode}")

if isinstance(compressor, CompatCompressor):
compressor_config_overwrite = compressor.cpu.get_config()
Expand Down
30 changes: 26 additions & 4 deletions python/tests/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,13 @@ def test_compressor_config_overwrite(tmp_path, xp, algo):
numpy.testing.assert_array_equal(z[:], range(10))


def test_open_cupy_array(tmp_path):
@pytest.mark.parametrize("write_mode", ["w", "w-", "a"])
@pytest.mark.parametrize("read_mode", ["r", "r+", "a"])
def test_open_cupy_array(tmp_path, write_mode, read_mode):
a = cupy.arange(10)
z = kvikio_zarr.open_cupy_array(
tmp_path,
mode="w",
mode=write_mode,
shape=a.shape,
dtype=a.dtype,
chunks=(2,),
Expand All @@ -231,17 +233,37 @@ def test_open_cupy_array(tmp_path):

z = kvikio_zarr.open_cupy_array(
tmp_path,
mode="r",
mode=read_mode,
)
assert a.shape == z.shape
assert a.dtype == z.dtype
assert isinstance(z[:], type(a))
assert z.compressor == kvikio_nvcomp_codec.NvCompBatchCodec("lz4")
cupy.testing.assert_array_equal(a, z[:])

z = zarr.open_array(tmp_path, mode="r")
z = zarr.open_array(tmp_path, mode=read_mode)
assert a.shape == z.shape
assert a.dtype == z.dtype
assert isinstance(z[:], numpy.ndarray)
assert z.compressor == kvikio_zarr.CompatCompressor.lz4().cpu
numpy.testing.assert_array_equal(a.get(), z[:])


@pytest.mark.parametrize("mode", ["r", "r+", "a"])
def test_open_cupy_array_incompatible_compressor(tmp_path, mode):
zarr.create((10,), store=tmp_path, compressor=numcodecs.Blosc())

with pytest.raises(ValueError, match="non-CUDA compatible compressor"):
kvikio_zarr.open_cupy_array(tmp_path, mode=mode)


def test_open_cupy_array_unknown_mode(tmp_path):
a = cupy.arange(10)
with pytest.raises(ValueError, match="Unknown mode: x"):
kvikio_zarr.open_cupy_array(
tmp_path,
mode="x",
shape=a.shape,
dtype=a.dtype,
chunks=(2,),
)

0 comments on commit c11f774

Please sign in to comment.