From 2f526cc34df07894a376ece0773e4cb14572eb7e Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Fri, 8 Nov 2024 14:47:50 +0100 Subject: [PATCH 01/12] Optimized RLUnplugged benchmark example and changed name. Furthermore, removed default read arguments in 'OfflineData' to always use Ray Data's optimized reads (specifically on the product). Moved call to schema to debug logging in 'OfflineData' to avoid any further overhead when loading a dataset. Signed-off-by: simonsays1980 --- rllib/offline/offline_data.py | 28 ++- ...=> benchmark_rlunplugged_atari_pong_bc.py} | 163 +++++++----------- 2 files changed, 79 insertions(+), 112 deletions(-) rename rllib/tuned_examples/bc/{benchmark_atari_pong_bc.py => benchmark_rlunplugged_atari_pong_bc.py} (60%) diff --git a/rllib/offline/offline_data.py b/rllib/offline/offline_data.py index 1f4628e6b4630..1f526ebf8bb87 100644 --- a/rllib/offline/offline_data.py +++ b/rllib/offline/offline_data.py @@ -33,9 +33,7 @@ def __init__(self, config: AlgorithmConfig): # Use `read_parquet` as default data read method. self.data_read_method = self.config.input_read_method # Override default arguments for the data read method. - self.data_read_method_kwargs = ( - self.default_read_method_kwargs | self.config.input_read_method_kwargs - ) + self.data_read_method_kwargs = self.config.input_read_method_kwargs # In case `EpisodeType` or `BatchType` batches are read the size # could differ from the final `train_batch_size_per_learner`. self.data_read_batch_size = self.config.input_read_batch_size @@ -75,11 +73,12 @@ def __init__(self, config: AlgorithmConfig): "'gcs' for GCS, 's3' for S3, or 'abs'" ) # Add the filesystem object to the write method kwargs. - self.data_read_method_kwargs.update( - { - "filesystem": self.filesystem_object, - } - ) + if self.filesystem_object: + self.data_read_method_kwargs.update( + { + "filesystem": self.filesystem_object, + } + ) try: # Load the dataset. @@ -90,9 +89,12 @@ def __init__(self, config: AlgorithmConfig): if self.materialize_data: self.data = self.data.materialize() stop_time = time.perf_counter() - logger.debug(f"Time for loading dataset: {stop_time - start_time}s.") + logger.debug( + "===> [OfflineData] - Time for loading dataset: " + f"{stop_time - start_time}s." + ) logger.info("Reading data from {}".format(self.path)) - logger.info(self.data.schema()) + logger.debug(self.data.schema()) except Exception as e: logger.error(e) # Avoids reinstantiating the batch iterator each time we sample. @@ -220,12 +222,6 @@ def sample( num_shards=num_shards, ) - @property - def default_read_method_kwargs(self): - return { - "override_num_blocks": max(self.config.num_learners * 2, 2), - } - @property def default_map_batches_kwargs(self): return { diff --git a/rllib/tuned_examples/bc/benchmark_atari_pong_bc.py b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py similarity index 60% rename from rllib/tuned_examples/bc/benchmark_atari_pong_bc.py rename to rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py index 3b4281abddf6f..538dbcc9f929b 100644 --- a/rllib/tuned_examples/bc/benchmark_atari_pong_bc.py +++ b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py @@ -10,30 +10,28 @@ d_t: float } """ - -# from google.cloud import storage +import ale_py import gymnasium as gym import io import numpy as np +import os -# from pathlib import Path from PIL import Image -import tree from typing import Optional +from ray import tune -import pyarrow.fs - -from ray.data import TFXReadOptions from ray.rllib.algorithms.bc import BCConfig from ray.rllib.connectors.connector_v2 import ConnectorV2 from ray.rllib.core.columns import Columns -from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig from ray.rllib.env.wrappers.atari_wrappers import wrap_atari_for_new_api_stack from ray.rllib.utils.annotations import override from ray.rllib.utils.test_utils import ( add_rllib_example_script_args, ) -from ray import tune + + +# Register all ALE environments with gymnasium. +gym.register_envs(ale_py) class DecodeObservations(ConnectorV2): @@ -84,22 +82,24 @@ def __call__( ): # Map encoded PNGs into arrays of shape (84, 84, 4). def _map_fn(s): - return np.concatenate( - [ - np.array(Image.open(io.BytesIO(s[i]))).reshape(84, 84, 1) - for i in range(4) - ], - axis=2, - ) + construct = [ + np.array(Image.open(io.BytesIO(s[i]))).reshape(84, 84, 1) + for i in range(4) + ] + result = np.concatenate(construct, axis=2) + return result # Add the observations for t. self.add_n_batch_items( batch=data, column=Columns.OBS, - items_to_add=tree.map_structure( - _map_fn, - sa_episode.get_observations(slice(0, len(sa_episode))), - ), + # Ensure, we pass in a list, otherwise it is considered + # an already batched array. + items_to_add=[ + _map_fn( + sa_episode.get_observations(slice(0, len(sa_episode)))[0], + ).astype(np.float32) + ], num_items=len(sa_episode), single_agent_episode=sa_episode, ) @@ -107,10 +107,11 @@ def _map_fn(s): self.add_n_batch_items( batch=data, column=Columns.NEXT_OBS, - items_to_add=tree.map_structure( - _map_fn, - sa_episode.get_observations(slice(1, len(sa_episode) + 1)), - ), + items_to_add=[ + _map_fn( + sa_episode.get_observations(slice(1, len(sa_episode) + 1))[0], + ).astype(np.float32) + ], num_items=len(sa_episode), single_agent_episode=sa_episode, ) @@ -138,63 +139,36 @@ def _env_creator(cfg): # Register the wrapped environment to `tune`. tune.register_env("WrappedALE/Pong-v5", _env_creator) -parser = add_rllib_example_script_args() # Use `parser` to add your own custom command line options to this script -# and (if needed) use their values to set up `config` below. +# and (if needed) use their values toset up `config` below. +parser = add_rllib_example_script_args( + default_reward=float("inf"), + default_timesteps=3000000, + default_iters=100000000000, +) args = parser.parse_args() -# RLUnplugged GCS bucket. This bucket contains for each set of environments -# (e.g. Atari) a directory and for each environment within. For each -# environment multiple runs were collected. -rlunplugged_base_path = "rl_unplugged/atari" +# Anyscale RLUnplugged storage bucket. The bucket contains from the +# original `RLUnplugged` bucket only the first `atari/Pong` run. +anyscale_storage_bucket = os.environ["ANYSCALE_ARTIFACT_STORAGE"] +anyscale_rlunplugged_atari_path = anyscale_storage_bucket + "/rllib/rl_unplugged/atari" # We only use the Atari game `Pong` here. Users can choose other Atari # games and set here the name. This will download `TfRecords` dataset from GCS. game = "Pong" # Path to the directory with all runs from Atari Pong. -rlunplugged_path = rlunplugged_base_path + f"/{game}" - -# Set up the GCS file system. -filesystem = pyarrow.fs.GcsFileSystem(anonymous=True) -# There are many run numbers, we choose the first one for demonstration. This -# can be chosen by users. To use all data use a list of file paths (see -# `num_shards`) and its usage further below. -# run_number = 1 - -# Get all file infos to calculate an optimal number of blocks for reading. -all_file_infos = filesystem.get_file_info(pyarrow.fs.FileSelector(rlunplugged_path)) -# Get the total file size -total_file_size_mb = sum(fi.size for fi in all_file_infos) -# A block is defined to be 128MB. -num_blocks = int(total_file_size_mb / 128) - 1 - -# # num_shards = 1 - - -# # Make the temporary directory for the downloaded data. -# tmp_path = "/tmp/atari" -# Path(tmp_path).joinpath(game).mkdir(exist_ok=True, parents=True) -# destination_file_name = f"{tmp_path}/{game}/run_{run_number}-00000-of-00001" - -# # If the file is not downloaded, yet, download it here. -# if not Path(destination_file_name).exists(): -# # Define the bucket and source file. -# bucket_name = "rl_unplugged" -# source_blob_name = f"atari/{game}/run_{run_number}-00000-of-00100" - -# # Download the data from the bucket. -# storage_client = storage.Client.create_anonymous_client() -# bucket = storage_client.bucket(bucket_name) -# blob = bucket.blob(source_blob_name) -# blob.download_to_filename(destination_file_name) +anyscale_rlunplugged_atari_pong_path = anyscale_rlunplugged_atari_path + f"/{game}" +print( + "Streaming RLUnplugged Atari Pong data from path: " + f"{anyscale_rlunplugged_atari_pong_path}" +) # Define the config for Behavior Cloning. config = ( BCConfig() .environment( env="WrappedALE/Pong-v5", - # TODO (sven): Does this have any influence in connectors? clip_rewards=True, ) # Use the new API stack that makes directly use of `ray.data`. @@ -209,29 +183,17 @@ def _env_creator(cfg): evaluation_duration=5, evaluation_parallel_to_training=True, ) + .learners( + num_learners=args.num_learners if args.num_learners > 1 else 0, + num_gpus_per_learner=args.num_gpus_per_learner, + ) # Note, the `input_` argument is the major argument for the # new offline API. Via the `input_read_method_kwargs` the # arguments for the `ray.data.Dataset` read method can be # configured. The read method needs at least as many blocks # as remote learners. .offline_data( - input_=["rl_unplugged/atari/Pong"], # destination_file_name, - input_read_method="read_tfrecords", - input_read_method_kwargs={ - # Note, `TFRecords` datasets in `rl_unplugged` are GZIP - # compressed and Arrow needs to decompress them. - "arrow_open_stream_args": {"compression": "gzip"}, - # Use enough reading blocks to scale well. - "override_num_blocks": num_blocks, - # Read in parallel with these many actors. - "concurrency": 80, - # TFX improves performance extensively. `tfx-bsl` needs to be - # installed for this. - "tfx_read_options": TFXReadOptions( - # batch_size=2000, - ), - }, - input_filesystem=filesystem, + input_=[anyscale_rlunplugged_atari_pong_path], # `rl_unplugged`'s data schema is different from the one used # internally in `RLlib`. Define the schema here so it can be used # when transforming column data to episodes. @@ -243,37 +205,43 @@ def _env_creator(cfg): Columns.NEXT_OBS: "o_tp1", Columns.TERMINATEDS: "d_t", }, + # Do not materialize data, instead stream the data from Anyscale's + # S3 bucket (note, streaming data is an Anyscale-platform-only feature). materialize_data=False, materialize_mapped_data=False, # Increase the parallelism in transforming batches, such that while # training, new batches are transformed while others are used in updating. map_batches_kwargs={ - "concurrency": 40, - "num_cpus": 40, - }, # max(args.num_gpus * 20, 20)}, - # When iterating over batches in the dataset, prefetch at least 20 - # batches per learner. Increase this for scaling out more. + "concurrency": 12, + "num_cpus": 12, + }, + # When iterating over batches in the dataset, prefetch at least 4 + # batches per learner. iter_batches_kwargs={ "prefetch_batches": 4, + # Do not use batch shuffling b/c it hurts performance. "local_shuffle_buffer_size": None, }, - dataset_num_iters_per_learner=1, + # Iterate over 10 batches per RLlib iteration if multiple learners + # are used. + dataset_num_iters_per_learner=10 if args.num_learners > 1 else 1, ) .training( # To increase learning speed with multiple learners, # increase the learning rate correspondingly. - lr=0.0008 * (args.num_learners or 1) ** 0.5, + lr=0.0008 * max(1, args.num_gpus**0.5), train_batch_size_per_learner=1024, # Use the defined learner connector above, to decode observations. learner_connector=_make_learner_connector, ) .rl_module( - model_config=DefaultModelConfig( - vf_share_layers=True, - conv_filters=[[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]], - conv_activation="relu", - post_fcnet_hiddens=[256], - ), + model_config_dict={ + "vf_share_layers": True, + "conv_filters": [[16, 4, 2], [32, 4, 2], [64, 4, 2], [128, 4, 2]], + "conv_activation": "relu", + "post_fcnet_hiddens": [256], + "uses_new_env_runners": True, + } ) ) @@ -284,3 +252,6 @@ def _env_creator(cfg): print(f"Iteration: {i + 1}") results = algo.train() print(results) + +for i in range(4): + print("FINISHED") From df8afbe069bb838e660d817cd831505c954e2a25 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Fri, 8 Nov 2024 15:17:31 +0100 Subject: [PATCH 02/12] Adapted all tuned examples to the optimizations made in this branch. Signed-off-by: simonsays1980 --- rllib/tuned_examples/bc/cartpole_bc.py | 5 ---- rllib/tuned_examples/bc/pendulum_bc.py | 27 ++++++++++++++++--- rllib/tuned_examples/cql/pendulum_cql.py | 6 ----- .../tuned_examples/marwil/cartpole_marwil.py | 6 ----- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/rllib/tuned_examples/bc/cartpole_bc.py b/rllib/tuned_examples/bc/cartpole_bc.py index fe4986b3b71c2..0be81dcd20330 100644 --- a/rllib/tuned_examples/bc/cartpole_bc.py +++ b/rllib/tuned_examples/bc/cartpole_bc.py @@ -50,11 +50,6 @@ # as remote learners. .offline_data( input_=[data_path.as_posix()], - # Define the number of reading blocks, these should be larger than 1 - # and aligned with the data size. - input_read_method_kwargs={ - "override_num_blocks": max((args.num_learners or 1) * 2, 2) - }, # Concurrency defines the number of processes that run the # `map_batches` transformations. This should be aligned with the # 'prefetch_batches' argument in 'iter_batches_kwargs'. diff --git a/rllib/tuned_examples/bc/pendulum_bc.py b/rllib/tuned_examples/bc/pendulum_bc.py index cbc06a776b4a3..d9c27ea647c2d 100644 --- a/rllib/tuned_examples/bc/pendulum_bc.py +++ b/rllib/tuned_examples/bc/pendulum_bc.py @@ -2,6 +2,7 @@ from ray.air.constants import TRAINING_ITERATION from ray.rllib.algorithms.bc import BCConfig +from ray.rllib.core.rl_module.default_model_config import DefaultModelConfig from ray.rllib.utils.metrics import ( ENV_RUNNER_RESULTS, EPISODE_RETURN_MEAN, @@ -25,7 +26,7 @@ data_path = "tests/data/pendulum/pendulum-v1_large" base_path = Path(__file__).parents[2] print(f"base_path={base_path}") -data_path = "local://" + base_path.joinpath(data_path).as_posix() +data_path = "local://" / base_path / data_path print(f"data_path={data_path}") # Define the BC config. @@ -48,15 +49,33 @@ # configured. The read method needs at least as many blocks # as remote learners. .offline_data( - input_=[data_path], - input_read_method_kwargs={"override_num_blocks": max(args.num_learners, 1)}, + input_=[data_path.as_posix()], + # Concurrency defines the number of processes that run the + # `map_batches` transformations. This should be aligned with the + # 'prefetch_batches' argument in 'iter_batches_kwargs'. + map_batches_kwargs={"concurrency": 2, "num_cpus": 2}, + # This data set is small so do not prefetch too many batches and use no + # local shuffle. + iter_batches_kwargs={ + "prefetch_batches": 1, + "local_shuffle_buffer_size": None, + }, + # The number of iterations to be run per learner when in multi-learner + # mode in a single RLlib training iteration. Leave this to `None` to + # run an entire epoch on the dataset during a single RLlib training + # iteration. For single-learner mode, 1 is the only option. dataset_num_iters_per_learner=1 if not args.num_learners else None, ) .training( # To increase learning speed with multiple learners, # increase the learning rate correspondingly. lr=0.0008 * (args.num_learners or 1) ** 0.5, - train_batch_size_per_learner=2000, + train_batch_size_per_learner=1024, + ) + .rl_module( + model_config=DefaultModelConfig( + fcnet_hiddens=[256, 256], + ), ) ) diff --git a/rllib/tuned_examples/cql/pendulum_cql.py b/rllib/tuned_examples/cql/pendulum_cql.py index 1db19b95c38ff..dff5446a0e0aa 100644 --- a/rllib/tuned_examples/cql/pendulum_cql.py +++ b/rllib/tuned_examples/cql/pendulum_cql.py @@ -39,12 +39,6 @@ ) .offline_data( input_=[data_path.as_posix()], - # The `kwargs` for the `input_read_method`. We override the - # the number of blocks to pull at once b/c our dataset is - # small. - input_read_method_kwargs={ - "override_num_blocks": max((args.num_learners or 1) * 2, 2) - }, # The `kwargs` for the `map_batches` method in which our # `OfflinePreLearner` is run. 2 data workers should be run # concurrently. diff --git a/rllib/tuned_examples/marwil/cartpole_marwil.py b/rllib/tuned_examples/marwil/cartpole_marwil.py index d1f5e8bfa15cc..9536dc4b1f897 100644 --- a/rllib/tuned_examples/marwil/cartpole_marwil.py +++ b/rllib/tuned_examples/marwil/cartpole_marwil.py @@ -49,12 +49,6 @@ # as remote learners. .offline_data( input_=[data_path.as_posix()], - # The `kwargs` for the `input_read_method`. We override the - # the number of blocks to pull at once b/c our dataset is - # small. - input_read_method_kwargs={ - "override_num_blocks": max((args.num_learners or 1) * 2, 2) - }, # The `kwargs` for the `map_batches` method in which our # `OfflinePreLearner` is run. 2 data workers should be run # concurrently. From 5a179ae6b679b75fa108698f01a7f3ce8d15db2a Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Fri, 8 Nov 2024 19:07:36 +0100 Subject: [PATCH 03/12] Removed call for schema completely to not hurt perforamance anyhow. Furthermore, removed learner and locality hints from multi-learner setups b/c we do not need them for the time being and actor handles cannot get serialized. Signed-off-by: simonsays1980 --- rllib/offline/offline_data.py | 4 +--- rllib/offline/offline_prelearner.py | 21 --------------------- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/rllib/offline/offline_data.py b/rllib/offline/offline_data.py index 1f526ebf8bb87..9d9712d32c9ce 100644 --- a/rllib/offline/offline_data.py +++ b/rllib/offline/offline_data.py @@ -94,7 +94,6 @@ def __init__(self, config: AlgorithmConfig): f"{stop_time - start_time}s." ) logger.info("Reading data from {}".format(self.path)) - logger.debug(self.data.schema()) except Exception as e: logger.error(e) # Avoids reinstantiating the batch iterator each time we sample. @@ -148,8 +147,7 @@ def sample( # Add constructor `kwargs` when using remote learners. fn_constructor_kwargs.update( { - "learner": self.learner_handles, - "locality_hints": self.locality_hints, + "learner": None, "module_spec": self.module_spec, "module_state": module_state, } diff --git a/rllib/offline/offline_prelearner.py b/rllib/offline/offline_prelearner.py index b000f2c965fc3..544f68c5cd529 100644 --- a/rllib/offline/offline_prelearner.py +++ b/rllib/offline/offline_prelearner.py @@ -1,10 +1,8 @@ import gymnasium as gym import logging import numpy as np -import random from typing import Any, Dict, List, Optional, Union, Set, Tuple, TYPE_CHECKING -import ray from ray.actor import ActorHandle from ray.rllib.core.columns import Columns from ray.rllib.core.learner import Learner @@ -87,7 +85,6 @@ def __init__( config: "AlgorithmConfig", learner: Union[Learner, list[ActorHandle]], spaces: Optional[Tuple[gym.Space, gym.Space]] = None, - locality_hints: Optional[list] = None, module_spec: Optional[MultiRLModuleSpec] = None, module_state: Optional[Dict[ModuleID, Any]] = None, ): @@ -103,24 +100,6 @@ def __init__( self._module = self._learner._module # Otherwise we have remote `Learner`s. else: - # TODO (simon): Check with the data team how to get at - # initialization the data block location. - node_id = ray.get_runtime_context().get_node_id() - # Shuffle indices such that not each data block syncs weights - # with the same learner in case there are multiple learners - # on the same node like the `PreLearner`. - indices = list(range(len(locality_hints))) - random.shuffle(indices) - locality_hints = [locality_hints[i] for i in indices] - learner = [learner[i] for i in indices] - # Choose a learner from the same node. - for i, hint in enumerate(locality_hints): - if hint == node_id: - self._learner = learner[i] - # If no learner has been chosen, there is none on the same node. - if not self._learner: - # Then choose a learner randomly. - self._learner = learner[random.randint(0, len(learner) - 1)] self.learner_is_remote = True # Build the module from spec. Note, this will be a MultiRLModule. self._module = module_spec.build() From a4070649244b628c5912fbb5e7b9220e82eaf738 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Fri, 8 Nov 2024 20:24:15 +0100 Subject: [PATCH 04/12] Added a scheduling strategy for multi-node and multi-learner setups. Furthermore, removed 'num_cpus' from 'map_batches' as it was blocking execution with multi-learner multi-node setups. In addition modified all #args.num_learner' usages. Signed-off-by: simonsays1980 --- .../bc/benchmark_rlunplugged_atari_pong_bc.py | 42 ++++++++++++++++--- 1 file changed, 37 insertions(+), 5 deletions(-) diff --git a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py index 538dbcc9f929b..7f53cefeeba67 100644 --- a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py +++ b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py @@ -29,7 +29,6 @@ add_rllib_example_script_args, ) - # Register all ALE environments with gymnasium. gym.register_envs(ale_py) @@ -148,6 +147,31 @@ def _env_creator(cfg): ) args = parser.parse_args() +# If multiple learners are requested define a scheduling +# strategy with best data locality. +if args.num_learners and args.num_learners > 1: + import ray + + ray.init() + # Check, if we have a multi-node cluster. + nodes = ray.nodes() + ray.shutdown() + print(f"Number of nodes in cluster: {len(nodes)}") + # If we have a multi-node cluster spread learners. + if len(nodes) > 1: + os.environ["TRAIN_ENABLE_WORKER_SPREAD_ENV"] = "1" + print( + "Multi-node cluster and multi-learner setup. " + "Using a 'SPREAD' scheduling strategy for learners" + "to support data locality." + ) + # Otherwise pack the learners on the single node. + else: + print( + "Single-node cluster and multi-learner setup. " + "Using a 'PACK' scheduling strategy for learners" + "to support data locality." + ) # Anyscale RLUnplugged storage bucket. The bucket contains from the # original `RLUnplugged` bucket only the first `atari/Pong` run. anyscale_storage_bucket = os.environ["ANYSCALE_ARTIFACT_STORAGE"] @@ -184,7 +208,9 @@ def _env_creator(cfg): evaluation_parallel_to_training=True, ) .learners( - num_learners=args.num_learners if args.num_learners > 1 else 0, + num_learners=args.num_learners + if args.num_learners and args.num_learners > 1 + else 0, num_gpus_per_learner=args.num_gpus_per_learner, ) # Note, the `input_` argument is the major argument for the @@ -213,7 +239,6 @@ def _env_creator(cfg): # training, new batches are transformed while others are used in updating. map_batches_kwargs={ "concurrency": 12, - "num_cpus": 12, }, # When iterating over batches in the dataset, prefetch at least 4 # batches per learner. @@ -224,12 +249,19 @@ def _env_creator(cfg): }, # Iterate over 10 batches per RLlib iteration if multiple learners # are used. - dataset_num_iters_per_learner=10 if args.num_learners > 1 else 1, + dataset_num_iters_per_learner=10 + if args.num_learners and args.num_learners > 1 + else 1, ) .training( # To increase learning speed with multiple learners, # increase the learning rate correspondingly. - lr=0.0008 * max(1, args.num_gpus**0.5), + lr=0.0008 + * max( + 1, + (args.num_learners if args.num_learners and args.num_learners > 1 else 1) + ** 0.5, + ), train_batch_size_per_learner=1024, # Use the defined learner connector above, to decode observations. learner_connector=_make_learner_connector, From af757e8d43853e3746ce0fcc116568a27fc53d02 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Mon, 11 Nov 2024 10:55:56 +0100 Subject: [PATCH 05/12] Changed dataset iterations per learner to a higher value to improve reduce overhead costs. Signed-off-by: simonsays1980 --- rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py index 7f53cefeeba67..6f67ac7dd8e8c 100644 --- a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py +++ b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py @@ -249,7 +249,7 @@ def _env_creator(cfg): }, # Iterate over 10 batches per RLlib iteration if multiple learners # are used. - dataset_num_iters_per_learner=10 + dataset_num_iters_per_learner=100 if args.num_learners and args.num_learners > 1 else 1, ) From 716fa69a98edf64e8e4b755be5d70d696c9a8879 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Wed, 20 Nov 2024 15:57:52 +0100 Subject: [PATCH 06/12] Removed local shuffle buffer from 'OfflineData''s defaults b/c it slows iterating over batches down as long as it is not fixed in Ray Data. Signed-off-by: simonsays1980 --- rllib/offline/offline_data.py | 2 -- rllib/offline/offline_prelearner.py | 1 + rllib/tuned_examples/bc/cartpole_bc.py | 1 - rllib/tuned_examples/bc/pendulum_bc.py | 1 - 4 files changed, 1 insertion(+), 4 deletions(-) diff --git a/rllib/offline/offline_data.py b/rllib/offline/offline_data.py index 9d9712d32c9ce..785eaf99e0aa1 100644 --- a/rllib/offline/offline_data.py +++ b/rllib/offline/offline_data.py @@ -231,6 +231,4 @@ def default_map_batches_kwargs(self): def default_iter_batches_kwargs(self): return { "prefetch_batches": 2, - "local_shuffle_buffer_size": self.config.train_batch_size_per_learner - or (self.config.train_batch_size // max(1, self.config.num_learners)) * 4, } diff --git a/rllib/offline/offline_prelearner.py b/rllib/offline/offline_prelearner.py index 544f68c5cd529..8ebb23f80ad8f 100644 --- a/rllib/offline/offline_prelearner.py +++ b/rllib/offline/offline_prelearner.py @@ -465,6 +465,7 @@ def convert(sample, space): # Note, `map_batches` expects a `Dict` as return value. return {"episodes": episodes} + @staticmethod def _map_sample_batch_to_episode( is_multi_agent: bool, batch: Dict[str, Union[list, np.ndarray]], diff --git a/rllib/tuned_examples/bc/cartpole_bc.py b/rllib/tuned_examples/bc/cartpole_bc.py index 0be81dcd20330..70e79cfd251e1 100644 --- a/rllib/tuned_examples/bc/cartpole_bc.py +++ b/rllib/tuned_examples/bc/cartpole_bc.py @@ -58,7 +58,6 @@ # local shuffle. iter_batches_kwargs={ "prefetch_batches": 1, - "local_shuffle_buffer_size": None, }, # The number of iterations to be run per learner when in multi-learner # mode in a single RLlib training iteration. Leave this to `None` to diff --git a/rllib/tuned_examples/bc/pendulum_bc.py b/rllib/tuned_examples/bc/pendulum_bc.py index d9c27ea647c2d..098a35e235b2b 100644 --- a/rllib/tuned_examples/bc/pendulum_bc.py +++ b/rllib/tuned_examples/bc/pendulum_bc.py @@ -58,7 +58,6 @@ # local shuffle. iter_batches_kwargs={ "prefetch_batches": 1, - "local_shuffle_buffer_size": None, }, # The number of iterations to be run per learner when in multi-learner # mode in a single RLlib training iteration. Leave this to `None` to From c296b56f9566e5b6aa9c9a481f4a79c9fc2d5440 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Tue, 10 Dec 2024 19:38:43 +0100 Subject: [PATCH 07/12] Removed 'locality_hints' from 'OfflinePreLearner' b/c it is not needed anymore. Signed-off-by: simonsays1980 --- rllib/examples/offline_rl/classes/image_offline_prelearner.py | 1 - rllib/offline/offline_prelearner.py | 1 - rllib/offline/tests/test_offline_prelearner.py | 1 - rllib/tuned_examples/bc/cartpole_bc.py | 4 +--- 4 files changed, 1 insertion(+), 6 deletions(-) diff --git a/rllib/examples/offline_rl/classes/image_offline_prelearner.py b/rllib/examples/offline_rl/classes/image_offline_prelearner.py index 8786357ce3622..82dd1429a04e3 100644 --- a/rllib/examples/offline_rl/classes/image_offline_prelearner.py +++ b/rllib/examples/offline_rl/classes/image_offline_prelearner.py @@ -35,7 +35,6 @@ def __init__( self, config: "AlgorithmConfig", learner: Union[Learner, List[ActorHandle]], - locality_hints: Optional[List[str]] = None, spaces: Optional[Tuple[gym.Space, gym.Space]] = None, module_spec: Optional[MultiRLModuleSpec] = None, module_state: Optional[Dict[ModuleID, Any]] = None, diff --git a/rllib/offline/offline_prelearner.py b/rllib/offline/offline_prelearner.py index 2a7af5c20e914..6e6ad4fa34ec2 100644 --- a/rllib/offline/offline_prelearner.py +++ b/rllib/offline/offline_prelearner.py @@ -86,7 +86,6 @@ def __init__( self, config: "AlgorithmConfig", learner: Union[Learner, list[ActorHandle]], - locality_hints: Optional[List[str]] = None, spaces: Optional[Tuple[gym.Space, gym.Space]] = None, module_spec: Optional[MultiRLModuleSpec] = None, module_state: Optional[Dict[ModuleID, Any]] = None, diff --git a/rllib/offline/tests/test_offline_prelearner.py b/rllib/offline/tests/test_offline_prelearner.py index 15530f4e70f6d..3d50573b09f65 100644 --- a/rllib/offline/tests/test_offline_prelearner.py +++ b/rllib/offline/tests/test_offline_prelearner.py @@ -164,7 +164,6 @@ def test_offline_prelearner_in_map_batches(self): ).iter_batches( batch_size=10, prefetch_batches=1, - local_shuffle_buffer_size=100, ) # Now sample a single batch. diff --git a/rllib/tuned_examples/bc/cartpole_bc.py b/rllib/tuned_examples/bc/cartpole_bc.py index e2b333af9656c..3e860346ebc8e 100644 --- a/rllib/tuned_examples/bc/cartpole_bc.py +++ b/rllib/tuned_examples/bc/cartpole_bc.py @@ -58,9 +58,7 @@ map_batches_kwargs={"concurrency": 2, "num_cpus": 2}, # This data set is small so do not prefetch too many batches and use no # local shuffle. - iter_batches_kwargs={ - "prefetch_batches": 1, - }, + iter_batches_kwargs={"prefetch_batches": 1}, # The number of iterations to be run per learner when in multi-learner # mode in a single RLlib training iteration. Leave this to `None` to # run an entire epoch on the dataset during a single RLlib training From 1d9e02c0f580ef285268eaad80287102f7d467e1 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Tue, 10 Dec 2024 19:49:02 +0100 Subject: [PATCH 08/12] Removed unused customization in 'RLUnplugged' example. Signed-off-by: simonsays1980 --- .../bc/benchmark_rlunplugged_atari_pong_bc.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py index 6f67ac7dd8e8c..d3072c933da69 100644 --- a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py +++ b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py @@ -33,6 +33,7 @@ gym.register_envs(ale_py) +# Define a `ConnectorV2` to decode stacked encoded Atari frames. class DecodeObservations(ConnectorV2): def __init__( self, @@ -244,8 +245,6 @@ def _env_creator(cfg): # batches per learner. iter_batches_kwargs={ "prefetch_batches": 4, - # Do not use batch shuffling b/c it hurts performance. - "local_shuffle_buffer_size": None, }, # Iterate over 10 batches per RLlib iteration if multiple learners # are used. @@ -277,7 +276,8 @@ def _env_creator(cfg): ) ) -# TODO (simon): Change to use the `run_rllib_example` function as soon as tuned. +# TODO (simon): Change to use the `run_rllib_example` function as soon as this works +# with Ray Tune. algo = config.build() for i in range(100): From 929f6f7be73d2f3ac9eeb4440b7fb9103305995f Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Wed, 11 Dec 2024 12:08:45 +0100 Subject: [PATCH 09/12] Changed the signature of the 'OfflinePreLearner' to force keyword args and modified tests. Signed-off-by: simonsays1980 --- rllib/offline/offline_prelearner.py | 1 + rllib/offline/tests/test_offline_prelearner.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rllib/offline/offline_prelearner.py b/rllib/offline/offline_prelearner.py index 6e6ad4fa34ec2..e885687a8a6ab 100644 --- a/rllib/offline/offline_prelearner.py +++ b/rllib/offline/offline_prelearner.py @@ -84,6 +84,7 @@ class OfflinePreLearner: @OverrideToImplementCustomLogic_CallToSuperRecommended def __init__( self, + *, config: "AlgorithmConfig", learner: Union[Learner, list[ActorHandle]], spaces: Optional[Tuple[gym.Space, gym.Space]] = None, diff --git a/rllib/offline/tests/test_offline_prelearner.py b/rllib/offline/tests/test_offline_prelearner.py index 3d50573b09f65..42897f354314c 100644 --- a/rllib/offline/tests/test_offline_prelearner.py +++ b/rllib/offline/tests/test_offline_prelearner.py @@ -74,7 +74,7 @@ def test_offline_prelearner_buffer_class(self): algo = self.config.build() # Build the `OfflinePreLearner` and add the learner. oplr = OfflinePreLearner( - self.config, + config=self.config, learner=algo.offline_data.learner_handles[0], ) @@ -192,7 +192,7 @@ def test_offline_prelearner_sample_from_old_sample_batch_data(self): algo = self.config.build() # Build the `OfflinePreLearner` and add the learner. oplr = OfflinePreLearner( - self.config, + config=self.config, learner=algo.offline_data.learner_handles[0], ) # Now, pull a batch of defined size formt he dataset. From 68dfd706b4c45d4875226375c6f4683d821e16dd Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Wed, 11 Dec 2024 12:10:34 +0100 Subject: [PATCH 10/12] Removed ALE syntax in RLUnplugged example b/c gymnasium 1.x does not need it anymore. Signed-off-by: simonsays1980 --- .../tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py index d3072c933da69..2b3468bc1e491 100644 --- a/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py +++ b/rllib/tuned_examples/bc/benchmark_rlunplugged_atari_pong_bc.py @@ -10,7 +10,6 @@ d_t: float } """ -import ale_py import gymnasium as gym import io import numpy as np @@ -29,9 +28,6 @@ add_rllib_example_script_args, ) -# Register all ALE environments with gymnasium. -gym.register_envs(ale_py) - # Define a `ConnectorV2` to decode stacked encoded Atari frames. class DecodeObservations(ConnectorV2): From 45924b57237e2dfb4ae0ede0bec6f0de29841479 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Thu, 12 Dec 2024 16:59:48 +0100 Subject: [PATCH 11/12] Fixed small bug in MARWIL test file that was due to forced keyword arguments in the 'OfflinePreLearner'. Signed-off-by: simonsays1980 --- rllib/algorithms/marwil/tests/test_marwil.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rllib/algorithms/marwil/tests/test_marwil.py b/rllib/algorithms/marwil/tests/test_marwil.py index 301e00b107716..094ec567a9980 100644 --- a/rllib/algorithms/marwil/tests/test_marwil.py +++ b/rllib/algorithms/marwil/tests/test_marwil.py @@ -158,7 +158,9 @@ def test_marwil_loss_function(self): batch = algo.offline_data.data.take_batch(2000) # Create the prelearner and compute advantages and values. - offline_prelearner = OfflinePreLearner(config, algo.learner_group._learner) + offline_prelearner = OfflinePreLearner( + config, learner=algo.learner_group._learner + ) # Note, for `ray.data`'s pipeline everything has to be a dictionary # therefore the batch is embedded into another dictionary. batch = offline_prelearner(batch)["batch"][0] From 58272534e57b21c1fba7690d1fe11af8cb49ce16 Mon Sep 17 00:00:00 2001 From: simonsays1980 Date: Fri, 13 Dec 2024 11:03:29 +0100 Subject: [PATCH 12/12] Added keyword to first argument in 'OfflinePreLearner' initialisation b/c test was failing. Signed-off-by: simonsays1980 --- rllib/algorithms/marwil/tests/test_marwil.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rllib/algorithms/marwil/tests/test_marwil.py b/rllib/algorithms/marwil/tests/test_marwil.py index 094ec567a9980..a43eacf46ba36 100644 --- a/rllib/algorithms/marwil/tests/test_marwil.py +++ b/rllib/algorithms/marwil/tests/test_marwil.py @@ -159,7 +159,7 @@ def test_marwil_loss_function(self): # Create the prelearner and compute advantages and values. offline_prelearner = OfflinePreLearner( - config, learner=algo.learner_group._learner + config=config, learner=algo.learner_group._learner ) # Note, for `ray.data`'s pipeline everything has to be a dictionary # therefore the batch is embedded into another dictionary.