Skip to content

Commit

Permalink
Upgrade to Ray 2.0 (#1635)
Browse files Browse the repository at this point in the history
- Update Ray 2.0, Modin 0.14.1
- Update datasources to 2.0 api
- Detect an existing cluster or create local otherwise
- Backward-compatible partition handling
- Fix partition keys bug
- Remove parallelism arg from load tests
- Remove custom object store size detection - rely on ray 2.x defaults.
- Tutorial - update 2.0 cluster config
  • Loading branch information
kukushking authored Sep 28, 2022
1 parent 3a68db6 commit 1073eb9
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 1,448 deletions.
2 changes: 2 additions & 0 deletions .bumpversion.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ first_value = 1

[bumpversion:file:README.md]

[bumpversion:file:CONTRIBUTING.md]

[bumpversion:file:CONTRIBUTING_COMMON_ERRORS.md]

[bumpversion:file:tests/unit/test_metadata.py]
Expand Down
19 changes: 5 additions & 14 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,6 @@ You can choose from three different environments to test your fixes/changes, bas
# The following is an example
cluster_name: ray-cluster
initial_workers: 2
min_workers: 2
max_workers: 2
Expand All @@ -303,12 +302,8 @@ available_node_types:
InstanceType: r5n.2xlarge # change instance type as required
IamInstanceProfile:
Arn: arn:aws:iam::{UPDATE YOUR ACCOUNT ID HERE}:instance-profile/ray-cluster-instance-profile
ImageId: ami-0ea510fcb67686b48 # latest ray images -> https://github.com/amzn/amazon-ray#amazon-ray-images
NetworkInterfaces:
- AssociatePublicIpAddress: True
SubnetId: {replace with subnet within above AZs}
Groups: [{ID of group `ray_client_security_group` created by the step above}]
DeviceIndex: 0
ImageId: ami-0ea510fcb67686b48 # latest ray images -> https://github.com/amzn/amazon-ray#amazon-ray-images
SubnetId: {replace with subnet within above AZs}
ray.worker.default:
min_workers: 2
Expand All @@ -317,15 +312,11 @@ available_node_types:
InstanceType: r5n.2xlarge
IamInstanceProfile:
Arn: arn:aws:iam::{UPDATE YOUR ACCOUNT ID HERE}:instance-profile/ray-cluster-instance-profile
ImageId: ami-0ea510fcb67686b48 # latest ray images -> https://github.com/amzn/amazon-ray#amazon-ray-images
NetworkInterfaces:
- AssociatePublicIpAddress: True
SubnetId: {replace with subnet within above AZs}
Groups: [{ID of group `ray_client_security_group` created by the step above}]
DeviceIndex: 0
ImageId: ami-0ea510fcb67686b48 # latest ray images -> https://github.com/amzn/amazon-ray#amazon-ray-images
SubnetId: {replace with subnet within above AZs}
setup_commands:
- pip install "awswrangler[distributed]==3.0.0a2"
- pip install "awswrangler[distributed]==3.0.0b1"
- pip install pytest
```
Expand Down
59 changes: 11 additions & 48 deletions awswrangler/distributed/_distributed.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
"""Distributed Module (PRIVATE)."""
import logging
import multiprocessing
import os
import sys
import warnings
from functools import wraps
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Union

from awswrangler._config import apply_configs, config

if config.distributed or TYPE_CHECKING:
import psutil
import ray # pylint: disable=import-error
from modin.distributed.dataframe.pandas import from_partitions, unwrap_partitions
from modin.pandas import DataFrame as ModinDataFrame
Expand Down Expand Up @@ -111,7 +107,7 @@ def initialize_ray(
log_to_driver: Optional[bool] = True,
object_store_memory: Optional[int] = None,
cpu_count: Optional[int] = None,
gpu_count: Optional[int] = 0,
gpu_count: Optional[int] = None,
) -> None:
"""
Connect to an existing Ray cluster or start one and connect to it.
Expand All @@ -133,9 +129,15 @@ def initialize_ray(
cpu_count : Optional[int]
Number of CPUs to assign to each raylet, by default None
gpu_count : Optional[int]
Number of GPUs to assign to each raylet, by default 0
Number of GPUs to assign to each raylet, by default None
"""
if not ray.is_initialized():
# Detect an existing cluster
ray_address = os.environ.get("RAY_ADDRESS")
if not address and ray_address:
_logger.info("Using address %s set in the environment variable RAY_ADDRESS", ray_address)
address = ray_address

if address:
ray.init(
address=address,
Expand All @@ -144,28 +146,13 @@ def initialize_ray(
log_to_driver=log_to_driver,
)
else:
if not object_store_memory:
object_store_memory = _get_ray_object_store_memory()

mac_size_limit = getattr(ray.ray_constants, "MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT", None)
if sys.platform == "darwin" and mac_size_limit is not None and object_store_memory > mac_size_limit:
warnings.warn(
"On Macs, Ray's performance is known to degrade with "
+ "object store size greater than "
+ f"{mac_size_limit / 2 ** 30:.4} GiB. Ray by default does "
+ "not allow setting an object store size greater than "
+ "that. This default is overridden to avoid "
+ "spilling to disk more often. To override this "
+ "behavior, you can initialize Ray yourself."
)
os.environ["RAY_ENABLE_MAC_LARGE_OBJECT_STORE"] = "1"

ray_runtime_env_vars = [
"__MODIN_AUTOIMPORT_PANDAS__",
]

ray_init_kwargs = {
"num_cpus": cpu_count or multiprocessing.cpu_count(),
"address": "local",
"num_cpus": cpu_count,
"num_gpus": gpu_count,
"include_dashboard": include_dashboard,
"ignore_reinit_error": ignore_reinit_error,
Expand All @@ -177,29 +164,5 @@ def initialize_ray(
"env_vars": {var: os.environ.get(var) for var in ray_runtime_env_vars if os.environ.get(var)}
},
}
_logger.info("Starting a local Ray cluster")
ray.init(**ray_init_kwargs)


def _get_ray_object_store_memory() -> Optional[int]:
virtual_memory = psutil.virtual_memory().total
if sys.platform.startswith("linux"):
shm_fd = os.open("/dev/shm", os.O_RDONLY)
try:
shm_stats = os.fstatvfs(shm_fd)
system_memory = shm_stats.f_bsize * shm_stats.f_bavail
if system_memory / (virtual_memory / 2) < 0.99:
warnings.warn(
f"The size of /dev/shm is too small ({system_memory} bytes). The required size "
+ f"is at least half of RAM ({virtual_memory // 2} bytes). Please, delete files "
+ "in /dev/shm or increase the size with --shm-size in Docker. Alternatively, set the "
+ "memory size for each Ray worker in bytes with the RAY_OBJECT_STORE_MEMORY env var."
)
finally:
os.close(shm_fd)
else:
system_memory = virtual_memory
object_store_memory: Optional[int] = int(0.6 * system_memory // 1e9 * 1e9) # type: ignore
# If the memory pool is smaller than 2GB, just use the default in ray.
if object_store_memory == 0:
object_store_memory = None
return object_store_memory
4 changes: 2 additions & 2 deletions awswrangler/distributed/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import pandas as pd
import ray
from modin.distributed.dataframe.pandas.partitions import from_partitions
from ray.data.impl.arrow_block import ArrowBlockAccessor, ArrowRow
from ray.data.impl.remote_fn import cached_remote_fn
from ray.data._internal.arrow_block import ArrowBlockAccessor, ArrowRow
from ray.data._internal.remote_fn import cached_remote_fn

from awswrangler._arrow import _table_to_df

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

import pandas as pd
import pyarrow
from ray.data._internal.pandas_block import PandasBlockAccessor
from ray.data.datasource.file_based_datasource import FileBasedDatasource
from ray.data.impl.pandas_block import PandasBlockAccessor

from awswrangler import _utils
from awswrangler.s3._read_text_core import _read_text_chunked, _read_text_file
Expand Down
191 changes: 73 additions & 118 deletions awswrangler/distributed/datasources/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,93 @@
import logging
from typing import Any, Callable, Dict, Iterator, List, Optional, Union

import numpy as np
import pyarrow as pa

# fs required to implicitly trigger S3 subsystem initialization
import pyarrow.fs # noqa: F401 pylint: disable=unused-import
import pyarrow.parquet as pq
from ray import cloudpickle # pylint: disable=wrong-import-order,ungrouped-imports
import ray
from ray.data._internal.output_buffer import BlockOutputBuffer
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.data.context import DatasetContext
from ray.data.datasource import BlockWritePathProvider, DefaultBlockWritePathProvider
from ray.data.datasource.datasource import ReadTask, WriteResult
from ray.data.datasource import BlockWritePathProvider, DefaultBlockWritePathProvider, Reader
from ray.data.datasource.datasource import WriteResult
from ray.data.datasource.file_based_datasource import (
_resolve_paths_and_filesystem,
_S3FileSystemWrapper,
_wrap_s3_serialization_workaround,
)
from ray.data.datasource.file_meta_provider import DefaultParquetMetadataProvider, ParquetMetadataProvider
from ray.data.datasource.parquet_datasource import (
_deregister_parquet_file_fragment_serialization,
_register_parquet_file_fragment_serialization,
PARQUET_READER_ROW_BATCH_SIZE,
_deserialize_pieces_with_retry,
_ParquetDatasourceReader,
_SerializedPiece,
)
from ray.data.impl.output_buffer import BlockOutputBuffer
from ray.data.impl.remote_fn import cached_remote_fn
from ray.types import ObjectRef

from awswrangler._arrow import _add_table_partitions

_logger: logging.Logger = logging.getLogger(__name__)

# The number of rows to read per batch. This is sized to generate 10MiB batches
# for rows about 1KiB in size.
PARQUET_READER_ROW_BATCH_SIZE = 100000

# Original implementation:
# https://github.com/ray-project/ray/blob/releases/2.0.0/python/ray/data/datasource/parquet_datasource.py
def _read_pieces(
block_udf: Optional[Callable[[Block[Any]], Block[Any]]],
reader_args: Any,
columns: Optional[List[str]],
schema: Optional[Union[type, "pyarrow.lib.Schema"]],
serialized_pieces: List[_SerializedPiece],
) -> Iterator["pyarrow.Table"]:
# This import is necessary to load the tensor extension type.
from ray.data.extensions.tensor_extension import ( # type: ignore # noqa: F401, E501 # pylint: disable=import-outside-toplevel, unused-import
ArrowTensorType,
)

# Deserialize after loading the filesystem class.
pieces: List["pyarrow._dataset.ParquetFileFragment"] = _deserialize_pieces_with_retry(serialized_pieces)

# Ensure that we're reading at least one dataset fragment.
assert len(pieces) > 0

import pyarrow as pa # pylint: disable=import-outside-toplevel

ctx = DatasetContext.get_current()
output_buffer = BlockOutputBuffer(
block_udf=block_udf,
target_max_block_size=ctx.target_max_block_size,
)

_logger.debug("Reading %s parquet pieces", len(pieces))
use_threads = reader_args.pop("use_threads", False)
path_root = reader_args.pop("path_root", None)
for piece in pieces:
batches = piece.to_batches(
use_threads=use_threads,
columns=columns,
schema=schema,
batch_size=PARQUET_READER_ROW_BATCH_SIZE,
**reader_args,
)
for batch in batches:
# Table creation is wrapped inside _add_table_partitions
# to add columns with partition values when dataset=True
# and cast them to categorical
table = _add_table_partitions(
table=pa.Table.from_batches([batch], schema=schema),
path=f"s3://{piece.path}",
path_root=path_root,
)
# If the table is empty, drop it.
if table.num_rows > 0:
output_buffer.add_block(table)
if output_buffer.has_next():
yield output_buffer.next()
output_buffer.finalize()
if output_buffer.has_next():
yield output_buffer.next()


# Patch _read_pieces function
ray.data.datasource.parquet_datasource._read_pieces = _read_pieces # pylint: disable=protected-access


class UserProvidedKeyBlockWritePathProvider(BlockWritePathProvider):
Expand Down Expand Up @@ -62,109 +117,9 @@ class ParquetDatasource:
def __init__(self) -> None:
self._write_paths: List[str] = []

# Original: https://github.com/ray-project/ray/blob/releases/1.13.0/python/ray/data/datasource/parquet_datasource.py
def prepare_read(
self,
parallelism: int,
use_threads: Union[bool, int],
paths: Union[str, List[str]],
schema: "pyarrow.lib.Schema",
columns: Optional[List[str]] = None,
coerce_int96_timestamp_unit: Optional[str] = None,
path_root: Optional[str] = None,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
meta_provider: ParquetMetadataProvider = DefaultParquetMetadataProvider(),
_block_udf: Optional[Callable[..., Any]] = None,
) -> List[ReadTask]:
"""Create and return read tasks for a Parquet file-based datasource."""
paths, filesystem = _resolve_paths_and_filesystem(paths, filesystem)

parquet_dataset = pq.ParquetDataset(
path_or_paths=paths,
filesystem=filesystem,
partitioning=None,
coerce_int96_timestamp_unit=coerce_int96_timestamp_unit,
use_legacy_dataset=False,
)

def read_pieces(serialized_pieces: str) -> Iterator[pa.Table]:
# Deserialize after loading the filesystem class.
try:
_register_parquet_file_fragment_serialization() # type: ignore
pieces = cloudpickle.loads(serialized_pieces)
finally:
_deregister_parquet_file_fragment_serialization() # type: ignore

# Ensure that we're reading at least one dataset fragment.
assert len(pieces) > 0

ctx = DatasetContext.get_current()
output_buffer = BlockOutputBuffer(block_udf=_block_udf, target_max_block_size=ctx.target_max_block_size)

_logger.debug("Reading %s parquet pieces", len(pieces))
for piece in pieces:
batches = piece.to_batches(
use_threads=use_threads,
columns=columns,
schema=schema,
batch_size=PARQUET_READER_ROW_BATCH_SIZE,
)
for batch in batches:
# Table creation is wrapped inside _add_table_partitions
# to add columns with partition values when dataset=True
table = _add_table_partitions(
table=pa.Table.from_batches([batch], schema=schema),
path=f"s3://{piece.path}",
path_root=path_root,
)
# If the table is empty, drop it.
if table.num_rows > 0:
output_buffer.add_block(table)
if output_buffer.has_next():
yield output_buffer.next()

output_buffer.finalize()
if output_buffer.has_next():
yield output_buffer.next()

if _block_udf is not None:
# Try to infer dataset schema by passing dummy table through UDF.
dummy_table = schema.empty_table()
try:
inferred_schema = _block_udf(dummy_table).schema
inferred_schema = inferred_schema.with_metadata(schema.metadata)
except Exception: # pylint: disable=broad-except
_logger.debug(
"Failed to infer schema of dataset by passing dummy table "
"through UDF due to the following exception:",
exc_info=True,
)
inferred_schema = schema
else:
inferred_schema = schema
read_tasks = []
metadata = meta_provider.prefetch_file_metadata(parquet_dataset.pieces) or []
try:
_register_parquet_file_fragment_serialization() # type: ignore
for pieces, metadata in zip( # type: ignore
np.array_split(parquet_dataset.pieces, parallelism),
np.array_split(metadata, parallelism),
):
if len(pieces) <= 0:
continue
serialized_pieces = cloudpickle.dumps(pieces) # type: ignore
input_files = [p.path for p in pieces]
meta = meta_provider(
input_files,
inferred_schema,
pieces=pieces,
prefetched_metadata=metadata,
)
read_tasks.append(ReadTask(lambda p=serialized_pieces: read_pieces(p), meta)) # type: ignore
finally:
_deregister_parquet_file_fragment_serialization() # type: ignore

return read_tasks
def create_reader(self, **kwargs: Dict[str, Any]) -> Reader[Any]:
"""Return a Reader for the given read arguments."""
return _ParquetDatasourceReader(**kwargs) # type: ignore

# Original implementation:
# https://github.com/ray-project/ray/blob/releases/1.13.0/python/ray/data/datasource/file_based_datasource.py
Expand Down
Loading

0 comments on commit 1073eb9

Please sign in to comment.