Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data] Add seed for read files #49129

Merged
merged 32 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
cb830b0
Initial commit
bveeramani Nov 22, 2024
1d9dd2e
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Nov 25, 2024
bed2558
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Nov 26, 2024
c40b06d
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 2, 2024
49a8d71
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 2, 2024
cad41d5
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 3, 2024
17df280
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 4, 2024
bb0e778
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 5, 2024
1547ebd
fix seed
Bye-legumes Dec 6, 2024
1b1501e
fix
Bye-legumes Dec 6, 2024
04d31f1
fix
Bye-legumes Dec 6, 2024
2620239
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 9, 2024
4dff29a
Merge branch 'master' of https://github.com/ray-project/ray
bveeramani Dec 11, 2024
5505a9c
Update python/ray/data/read_api.py
Bye-legumes Dec 12, 2024
ed72dfe
Update python/ray/data/read_api.py
Bye-legumes Dec 12, 2024
b51581e
Update python/ray/data/read_api.py
Bye-legumes Dec 12, 2024
a91dd3f
Update python/ray/data/read_api.py
Bye-legumes Dec 12, 2024
60afeb7
Dfix
Bye-legumes Dec 12, 2024
ef73bd1
Dfix
Bye-legumes Dec 12, 2024
5d248a7
fix
Bye-legumes Dec 12, 2024
e8b201d
fix
Bye-legumes Dec 12, 2024
16c9125
Merge branch 'master' into random_read_files
Bye-legumes Dec 12, 2024
65c567b
fix
Bye-legumes Dec 12, 2024
2d78ee0
fix
Bye-legumes Dec 12, 2024
8cf1219
fix
Bye-legumes Dec 16, 2024
54ae10c
Merge branch 'master' into random_read_files
Bye-legumes Dec 16, 2024
6a10e95
Update files
bveeramani Dec 16, 2024
3ca245e
Update some stuff
bveeramani Dec 16, 2024
4958c67
Merge branch 'master' into pr/49129
bveeramani Dec 16, 2024
3052a38
Fix bug
bveeramani Dec 16, 2024
f3c01ca
Add anonymous to S3 path
bveeramani Dec 16, 2024
9777b29
Fix example
bveeramani Dec 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,12 @@ MetadataProvider API
datasource.DefaultFileMetadataProvider
datasource.ParquetMetadataProvider
datasource.FastFileMetadataProvider

Shuffling API
-------------

.. autosummary::
:nosignatures:
:toctree: doc/

FileShuffleConfig
2 changes: 2 additions & 0 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
BlockBasedFileDatasink,
Datasink,
Datasource,
FileShuffleConfig,
ReadTask,
RowBasedFileDatasink,
)
Expand Down Expand Up @@ -115,6 +116,7 @@
"Datasource",
"ExecutionOptions",
"ExecutionResources",
"FileShuffleConfig",
"NodeIdStr",
"ReadTask",
"RowBasedFileDatasink",
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from ray.data.context import DataContext
from ray.data.datasource import Datasource
from ray.data.datasource.datasource import ReadTask
from ray.data.datasource.file_based_datasource import FileShuffleConfig
from ray.data.datasource.file_meta_provider import (
DefaultFileMetadataProvider,
_handle_read_os_error,
Expand Down Expand Up @@ -306,6 +307,8 @@ def __init__(
self._partitioning = partitioning
if shuffle == "files":
self._file_metadata_shuffler = np.random.default_rng()
elif isinstance(shuffle, FileShuffleConfig):
self._file_metadata_shuffler = np.random.default_rng(shuffle.seed)

sample_infos = sample_fragments(
self._pq_fragments,
Expand Down
2 changes: 2 additions & 0 deletions python/ray/data/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
)
from ray.data.datasource.file_based_datasource import (
FileBasedDatasource,
FileShuffleConfig,
_S3FileSystemWrapper,
)
from ray.data.datasource.file_datasink import (
Expand Down Expand Up @@ -43,6 +44,7 @@
"DummyOutputDatasink",
"FastFileMetadataProvider",
"FileBasedDatasource",
"FileShuffleConfig",
"FileMetadataProvider",
"FilenameProvider",
"ParquetMetadataProvider",
Expand Down
45 changes: 42 additions & 3 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import io
import logging
from dataclasses import dataclass
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -61,6 +62,39 @@
OPEN_FILE_MAX_ATTEMPTS = 10


@DeveloperAPI
@dataclass
class FileShuffleConfig:
"""Configuration for file shuffling.

This configuration object controls how files are shuffled while reading file-based
datasets.

.. note::
Even if you provided a seed, you might still observe a non-deterministic row
order. This is because tasks are executed in parallel and their completion
order might vary. If you need to preserve the order of rows, set
`DataContext.get_current().execution_options.preserve_order`.

Args:
seed: An optional integer seed for the file shuffler. If provided, Ray Data
shuffles files deterministically based on this seed.

Example:
>>> import ray
>>> from ray.data import FileShuffleConfig
>>> shuffle = FileShuffleConfig(seed=42)
>>> ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea", shuffle=shuffle)
""" # noqa: E501

seed: Optional[int] = None

def __post_init__(self):
"""Ensure that the seed is either None or an integer."""
if self.seed is not None and not isinstance(self.seed, int):
raise ValueError("Seed must be an integer or None.")


@DeveloperAPI
class FileBasedDatasource(Datasource):
"""File-based datasource for reading files.
Expand Down Expand Up @@ -88,7 +122,7 @@ def __init__(
partition_filter: PathPartitionFilter = None,
partitioning: Partitioning = None,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
include_paths: bool = False,
file_extensions: Optional[List[str]] = None,
):
Expand Down Expand Up @@ -154,6 +188,9 @@ def __init__(
self._file_metadata_shuffler = None
if shuffle == "files":
self._file_metadata_shuffler = np.random.default_rng()
elif isinstance(shuffle, FileShuffleConfig):
# Create a NumPy random generator with a fixed seed if provided
self._file_metadata_shuffler = np.random.default_rng(shuffle.seed)

# Read tasks serialize `FileBasedDatasource` instances, and the list of paths
# can be large. To avoid slow serialization speeds, we store a reference to
Expand Down Expand Up @@ -526,8 +563,10 @@ def _open_file_with_retry(


def _validate_shuffle_arg(shuffle: Optional[str]) -> None:
if shuffle not in [None, "files"]:
if not (
shuffle is None or shuffle == "files" or isinstance(shuffle, FileShuffleConfig)
):
raise ValueError(
f"Invalid value for 'shuffle': {shuffle}. "
"Valid values are None, 'files'."
"Valid values are None, 'files', `FileShuffleConfig`."
)
62 changes: 35 additions & 27 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@
)
from ray.data.datasource.datasource import Reader
from ray.data.datasource.file_based_datasource import (
FileShuffleConfig,
_unwrap_arrow_serialization_workaround,
_validate_shuffle_arg,
)
from ray.data.datasource.file_meta_provider import (
DefaultFileMetadataProvider,
Expand Down Expand Up @@ -608,7 +610,7 @@ def read_parquet(
meta_provider: Optional[ParquetMetadataProvider] = None,
partition_filter: Optional[PathPartitionFilter] = None,
partitioning: Optional[Partitioning] = Partitioning("hive"),
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
include_paths: bool = False,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
Expand Down Expand Up @@ -716,7 +718,8 @@ def read_parquet(
partitioning: A :class:`~ray.data.datasource.partitioning.Partitioning` object
that describes how paths are organized. Defaults to HIVE partitioning.
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
set of arguments, see the `PyArrow API <https://arrow.apache.org/docs/\
python/generated/pyarrow.dataset.Scanner.html\
Expand Down Expand Up @@ -789,7 +792,7 @@ def read_images(
mode: Optional[str] = None,
include_paths: bool = False,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = ImageDatasource._FILE_EXTENSIONS,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -880,7 +883,8 @@ class string
ignore_missing_paths: If True, ignores any file/directory paths in ``paths``
that are not found. Defaults to False.
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
Expand Down Expand Up @@ -940,7 +944,7 @@ def read_parquet_bulk(
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
meta_provider: Optional[BaseFileMetadataProvider] = None,
partition_filter: Optional[PathPartitionFilter] = None,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
include_paths: bool = False,
file_extensions: Optional[List[str]] = ParquetBulkDatasource._FILE_EXTENSIONS,
concurrency: Optional[int] = None,
Expand Down Expand Up @@ -1008,7 +1012,8 @@ def read_parquet_bulk(
By default, this filters out any file paths whose file extension does not
match "*.parquet*".
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
set of arguments, see
the `PyArrow API <https://arrow.apache.org/docs/python/generated/\
Expand Down Expand Up @@ -1078,7 +1083,7 @@ def read_json(
partitioning: Partitioning = Partitioning("hive"),
include_paths: bool = False,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = JSONDatasource._FILE_EXTENSIONS,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -1164,6 +1169,8 @@ def read_json(
ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
found. Defaults to False.
shuffle: If setting to "files", randomly shuffle input files order before read.
If setting to ``FileShuffleConfig``, you can pass a random seed to shuffle
the input files, e.g. ``FileShuffleConfig(seed=42)``.
Defaults to not shuffle with ``None``.
arrow_json_args: JSON read options to pass to `pyarrow.json.read_json <https://\
arrow.apache.org/docs/python/generated/pyarrow.json.read_json.html#pyarrow.\
Expand Down Expand Up @@ -1221,7 +1228,7 @@ def read_csv(
partitioning: Partitioning = Partitioning("hive"),
include_paths: bool = False,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -1331,7 +1338,8 @@ def read_csv(
ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
found. Defaults to False.
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
arrow_csv_args: CSV read options to pass to
`pyarrow.csv.open_csv <https://arrow.apache.org/docs/python/generated/\
pyarrow.csv.open_csv.html#pyarrow.csv.open_csv>`_
Expand Down Expand Up @@ -1391,7 +1399,7 @@ def read_text(
partitioning: Partitioning = None,
include_paths: bool = False,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -1447,7 +1455,8 @@ def read_text(
ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
found. Defaults to False.
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
Expand Down Expand Up @@ -1503,7 +1512,7 @@ def read_avro(
partitioning: Partitioning = None,
include_paths: bool = False,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -1558,7 +1567,8 @@ def read_avro(
ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
found. Defaults to False.
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
Expand Down Expand Up @@ -1610,7 +1620,7 @@ def read_numpy(
partitioning: Partitioning = None,
include_paths: bool = False,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = NumpyDatasource._FILE_EXTENSIONS,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -1655,6 +1665,8 @@ def read_numpy(
ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
found. Defaults to False.
shuffle: If setting to "files", randomly shuffle input files order before read.
if setting to ``FileShuffleConfig``, the random seed can be passed toshuffle the
input files, i.e. ``FileShuffleConfig(seed = 42)``.
Defaults to not shuffle with ``None``.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
Expand Down Expand Up @@ -1707,7 +1719,7 @@ def read_tfrecords(
include_paths: bool = False,
ignore_missing_paths: bool = False,
tf_schema: Optional["schema_pb2.Schema"] = None,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -1785,7 +1797,8 @@ def read_tfrecords(
tf_schema: Optional TensorFlow Schema which is used to explicitly set the schema
of the underlying Dataset.
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
Expand Down Expand Up @@ -1875,7 +1888,7 @@ def read_webdataset(
filerename: Optional[Union[list, callable]] = None,
suffixes: Optional[Union[list, callable]] = None,
verbose_open: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
include_paths: bool = False,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
Expand Down Expand Up @@ -1906,6 +1919,8 @@ def read_webdataset(
suffixes: A function or list of suffixes to select for creating samples.
verbose_open: Whether to print the file names as they are opened.
shuffle: If setting to "files", randomly shuffle input files order before read.
if setting to ``FileShuffleConfig``, the random seed can be passed toshuffle the
input files, i.e. ``FileShuffleConfig(seed = 42)``.
Defaults to not shuffle with ``None``.
include_paths: If ``True``, include the path to each file. File paths are
stored in the ``'path'`` column.
Expand Down Expand Up @@ -1971,7 +1986,7 @@ def read_binary_files(
partition_filter: Optional[PathPartitionFilter] = None,
partitioning: Partitioning = None,
ignore_missing_paths: bool = False,
shuffle: Union[Literal["files"], None] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
file_extensions: Optional[List[str]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
Expand Down Expand Up @@ -2034,7 +2049,8 @@ def read_binary_files(
ignore_missing_paths: If True, ignores any file paths in ``paths`` that are not
found. Defaults to False.
shuffle: If setting to "files", randomly shuffle input files order before read.
Defaults to not shuffle with ``None``.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
Expand Down Expand Up @@ -3410,14 +3426,6 @@ def _get_num_output_blocks(
return parallelism


def _validate_shuffle_arg(shuffle: Optional[str]) -> None:
if shuffle not in [None, "files"]:
raise ValueError(
f"Invalid value for 'shuffle': {shuffle}. "
"Valid values are None, 'files'."
)


def _emit_meta_provider_deprecation_warning(
meta_provider: Optional[BaseFileMetadataProvider],
) -> None:
Expand Down
Loading
Loading