diff --git a/examples/multiprocess_pendulum.py b/examples/multiprocess_pendulum.py new file mode 100644 index 00000000..b20ccf6c --- /dev/null +++ b/examples/multiprocess_pendulum.py @@ -0,0 +1,128 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim + +import numpy as np +from tqdm import trange + +from mushroom_rl.core import VectorCore, Logger, MultiprocessEnvironment +from mushroom_rl.environments import Gym +from mushroom_rl.algorithms.actor_critic import PPO + +from mushroom_rl.policy import GaussianTorchPolicy + + +class Network(nn.Module): + def __init__(self, input_shape, output_shape, n_features, **kwargs): + super(Network, self).__init__() + + n_input = input_shape[-1] + n_output = output_shape[0] + + self._h1 = nn.Linear(n_input, n_features) + self._h2 = nn.Linear(n_features, n_features) + self._h3 = nn.Linear(n_features, n_output) + + nn.init.xavier_uniform_(self._h1.weight, + gain=nn.init.calculate_gain('relu')) + nn.init.xavier_uniform_(self._h2.weight, + gain=nn.init.calculate_gain('relu')) + nn.init.xavier_uniform_(self._h3.weight, + gain=nn.init.calculate_gain('linear')) + + def forward(self, state, **kwargs): + features1 = F.relu(self._h1(torch.squeeze(state, 1).float())) + features2 = F.relu(self._h2(features1)) + a = self._h3(features2) + + return a + + +def experiment(alg, env_id, horizon, gamma, n_epochs, n_steps, n_steps_per_fit, n_episodes_test, + alg_params, policy_params): + + logger = Logger(alg.__name__, results_dir=None) + logger.strong_line() + logger.info('Experiment Algorithm: ' + alg.__name__) + + mdp = MultiprocessEnvironment(Gym, env_id, horizon, gamma, n_envs=15) + + critic_params = dict(network=Network, + optimizer={'class': optim.Adam, + 'params': {'lr': 3e-4}}, + loss=F.mse_loss, + n_features=32, + batch_size=64, + input_shape=mdp.info.observation_space.shape, + output_shape=(1,)) + + policy = GaussianTorchPolicy(Network, + mdp.info.observation_space.shape, + mdp.info.action_space.shape, + **policy_params) + + alg_params['critic_params'] = critic_params + + agent = alg(mdp.info, policy, **alg_params) + #agent.set_logger(logger) + + core = VectorCore(agent, mdp) + + dataset = core.evaluate(n_episodes=n_episodes_test, render=False) + + J = np.mean(dataset.discounted_return) + R = np.mean(dataset.undiscounted_return) + E = agent.policy.entropy() + + logger.epoch_info(0, J=J, R=R, entropy=E) + + for it in trange(n_epochs, leave=False): + core.learn(n_steps=n_steps, n_steps_per_fit=n_steps_per_fit) + dataset = core.evaluate(n_episodes=n_episodes_test, render=False) + + J = np.mean(dataset.discounted_return) + R = np.mean(dataset.undiscounted_return) + E = agent.policy.entropy() + + logger.epoch_info(it+1, J=J, R=R, entropy=E) + + logger.info('Press a button to visualize') + input() + core.evaluate(n_episodes=5, render=True) + + +if __name__ == '__main__': + max_kl = .015 + + policy_params = dict( + std_0=1., + n_features=32, + use_cuda=False + + ) + + ppo_params = dict(actor_optimizer={'class': optim.Adam, + 'params': {'lr': 3e-4}}, + n_epochs_policy=4, + batch_size=64, + eps_ppo=.2, + lam=.95) + + trpo_params = dict(ent_coeff=0.0, + max_kl=.01, + lam=.95, + n_epochs_line_search=10, + n_epochs_cg=100, + cg_damping=1e-2, + cg_residual_tol=1e-10) + + algs_params = [ + (PPO, 'ppo', ppo_params) + ] + + for alg, alg_name, alg_params in algs_params: + experiment(alg=alg, env_id='Pendulum-v1', horizon=200, gamma=.99, + n_epochs=40, n_steps=30000, n_steps_per_fit=3000, + n_episodes_test=25, alg_params=alg_params, + policy_params=policy_params) diff --git a/examples/pendulum_trust_region.py b/examples/pendulum_trust_region.py index fbef01d3..f440d3f8 100644 --- a/examples/pendulum_trust_region.py +++ b/examples/pendulum_trust_region.py @@ -11,7 +11,6 @@ from mushroom_rl.algorithms.actor_critic import TRPO, PPO from mushroom_rl.policy import GaussianTorchPolicy -from mushroom_rl.utils.dataset import compute_J class Network(nn.Module): @@ -66,14 +65,14 @@ def experiment(alg, env_id, horizon, gamma, n_epochs, n_steps, n_steps_per_fit, alg_params['critic_params'] = critic_params agent = alg(mdp.info, policy, **alg_params) - agent.set_logger(logger) + #agent.set_logger(logger) core = Core(agent, mdp) dataset = core.evaluate(n_episodes=n_episodes_test, render=False) - J = np.mean(compute_J(dataset, mdp.info.gamma)) - R = np.mean(compute_J(dataset)) + J = np.mean(dataset.discounted_return) + R = np.mean(dataset.undiscounted_return) E = agent.policy.entropy() logger.epoch_info(0, J=J, R=R, entropy=E) @@ -82,8 +81,8 @@ def experiment(alg, env_id, horizon, gamma, n_epochs, n_steps, n_steps_per_fit, core.learn(n_steps=n_steps, n_steps_per_fit=n_steps_per_fit) dataset = core.evaluate(n_episodes=n_episodes_test, render=False) - J = np.mean(compute_J(dataset, mdp.info.gamma)) - R = np.mean(compute_J(dataset)) + J = np.mean(dataset.discounted_return) + R = np.mean(dataset.undiscounted_return) E = agent.policy.entropy() logger.epoch_info(it+1, J=J, R=R, entropy=E) @@ -99,7 +98,7 @@ def experiment(alg, env_id, horizon, gamma, n_epochs, n_steps, n_steps_per_fit, policy_params = dict( std_0=1., n_features=32, - use_cuda=torch.cuda.is_available() + use_cuda=False ) @@ -119,7 +118,7 @@ def experiment(alg, env_id, horizon, gamma, n_epochs, n_steps, n_steps_per_fit, cg_residual_tol=1e-10) algs_params = [ - (TRPO, 'trpo', trpo_params), + #(TRPO, 'trpo', trpo_params), (PPO, 'ppo', ppo_params) ] diff --git a/mushroom_rl/__init__.py b/mushroom_rl/__init__.py index 52af183e..4e2fb170 100644 --- a/mushroom_rl/__init__.py +++ b/mushroom_rl/__init__.py @@ -1 +1 @@ -__version__ = '1.10.0' +__version__ = '2.0.0-rc1' diff --git a/mushroom_rl/algorithms/actor_critic/deep_actor_critic/ppo.py b/mushroom_rl/algorithms/actor_critic/deep_actor_critic/ppo.py index 748ed39f..a69f05b2 100644 --- a/mushroom_rl/algorithms/actor_critic/deep_actor_critic/ppo.py +++ b/mushroom_rl/algorithms/actor_critic/deep_actor_critic/ppo.py @@ -109,23 +109,22 @@ def _update_policy(self, obs, act, adv, old_log_p): def _log_info(self, dataset, x, v_target, old_pol_dist): if self._logger: - logging_verr = [] - torch_v_targets = torch.tensor(v_target, dtype=torch.float) - for idx in range(len(self._V)): - v_pred = torch.tensor(self._V(x, idx=idx), dtype=torch.float) - v_err = F.mse_loss(v_pred, torch_v_targets) - logging_verr.append(v_err.item()) - - logging_ent = self.policy.entropy(x) - new_pol_dist = self.policy.distribution(x) - logging_kl = torch.mean(torch.distributions.kl.kl_divergence( - new_pol_dist, old_pol_dist)) - avg_rwd = np.mean(dataset.undiscounted_return) - msg = "Iteration {}:\n\t\t\t\trewards {} vf_loss {}\n\t\t\t\tentropy {} kl {}".format( - self._iter, avg_rwd, logging_verr, logging_ent, logging_kl) - - self._logger.info(msg) - self._logger.weak_line() + with torch.no_grad(): + logging_verr = [] + for idx in range(len(self._V)): + v_pred = self._V(x, idx=idx, output_tensor=True) + v_err = F.mse_loss(v_pred, v_target) + logging_verr.append(v_err.item()) + + logging_ent = self.policy.entropy(x) + new_pol_dist = self.policy.distribution(x) + logging_kl = torch.mean(torch.distributions.kl.kl_divergence(new_pol_dist, old_pol_dist)) + avg_rwd = np.mean(dataset.undiscounted_return) + msg = "Iteration {}:\n\t\t\t\trewards {} vf_loss {}\n\t\t\t\tentropy {} kl {}".format( + self._iter, avg_rwd, logging_verr, logging_ent, logging_kl) + + self._logger.info(msg) + self._logger.weak_line() def _post_load(self): if self._optimizer is not None: diff --git a/mushroom_rl/core/__init__.py b/mushroom_rl/core/__init__.py index 16d34d51..0dfbe45a 100644 --- a/mushroom_rl/core/__init__.py +++ b/mushroom_rl/core/__init__.py @@ -5,6 +5,11 @@ from .serialization import Serializable from .logger import Logger +from .vectorized_core import VectorCore +from .vectorized_env import VectorizedEnvironment +from .multiprocess_environment import MultiprocessEnvironment + import mushroom_rl.environments -__all__ = ['Core', 'Dataset', 'Environment', 'MDPInfo', 'Agent', 'AgentInfo', 'Serializable', 'Logger'] +__all__ = ['Core', 'Dataset', 'Environment', 'MDPInfo', 'Agent', 'AgentInfo', 'Serializable', 'Logger', + 'VectorCore', 'VectorizedEnvironment', 'MultiprocessEnvironment'] diff --git a/mushroom_rl/core/dataset.py b/mushroom_rl/core/dataset.py index 18deb219..1a31841e 100644 --- a/mushroom_rl/core/dataset.py +++ b/mushroom_rl/core/dataset.py @@ -180,7 +180,7 @@ def __add__(self, other): result._info = new_info result._episode_info = new_episode_info - result.theta_list = result._theta_list + other._theta_list + result._theta_list = result._theta_list + other._theta_list result._data = self._data + other._data return result diff --git a/mushroom_rl/core/multiprocess_environment.py b/mushroom_rl/core/multiprocess_environment.py index ca190e5e..ce935326 100644 --- a/mushroom_rl/core/multiprocess_environment.py +++ b/mushroom_rl/core/multiprocess_environment.py @@ -1,5 +1,4 @@ -from multiprocessing import Pipe -from multiprocessing import Process +from multiprocessing import Pipe, Process, cpu_count import numpy as np @@ -16,14 +15,25 @@ def _env_worker(remote, env_class, use_generator, args, kwargs): try: while True: cmd, data = remote.recv() + + # if data is None: + # print(f'Executed command {cmd} with None data') + if cmd == 'step': - action = data[0] + action = data res = env.step(action) remote.send(res) elif cmd == 'reset': - init_states = data[0] + if data is not None: + init_states = data[0] + else: + init_states = None res = env.reset(init_states) remote.send(res) + elif cmd == 'render': + record = data + res = env.render(record=record) + remote.send(res) elif cmd in 'stop': env.stop() remote.send(None) @@ -32,7 +42,10 @@ def _env_worker(remote, env_class, use_generator, args, kwargs): elif cmd == 'seed': env.seed(int(data)) remote.send(None) + elif cmd == 'close': + break else: + print(f'cmd {cmd}') raise NotImplementedError() finally: remote.close() @@ -56,7 +69,10 @@ def __init__(self, env_class, *args, n_envs=-1, use_generator=False, **kwargs): **kwargs: keyword arguments to set to the constructor or to the generator; """ - assert n_envs > 1 + assert n_envs > 1 or n_envs == -1 + + if n_envs == -1: + n_envs = cpu_count() self._remotes, self._work_remotes = zip(*[Pipe() for _ in range(n_envs)]) self._processes = list() @@ -73,20 +89,9 @@ def __init__(self, env_class, *args, n_envs=-1, use_generator=False, **kwargs): super().__init__(mdp_info, n_envs) - def step_all(self, env_mask, action): - for i, remote in enumerate(self._remotes): - if env_mask[i]: - remote.send(('step', action[i, :])) - - states = list() - step_infos = list() - for i, remote in enumerate(self._remotes): - if env_mask[i]: - state, step_info = remote.recv() - states.append(remote.recv()) - step_infos.append(step_info) - - return np.array(states), step_infos + self._state_shape = (n_envs,) + self.info.observation_space.shape + self._reward_shape = (n_envs,) + self._absorbing_shape = (n_envs,) def reset_all(self, env_mask, state=None): for i, remote in enumerate(self._remotes): @@ -94,15 +99,41 @@ def reset_all(self, env_mask, state=None): state_i = state[i, :] if state is not None else None remote.send(('reset', state_i)) - states = list() + states = np.empty(self._state_shape) episode_infos = list() for i, remote in enumerate(self._remotes): if env_mask[i]: state, episode_info = remote.recv() - states.append(state) + + states[i] = state episode_infos.append(episode_info) + else: + episode_infos.append({}) - return np.array(states), episode_infos + return states, episode_infos + + def step_all(self, env_mask, action): + for i, remote in enumerate(self._remotes): + if env_mask[i]: + remote.send(('step', action[i, :])) + + states = np.empty(self._state_shape) + rewards = np.empty(self._reward_shape) + absorbings = np.empty(self._absorbing_shape, dtype=bool) + step_infos = list() + + for i, remote in enumerate(self._remotes): + if env_mask[i]: + state, reward, absorbing, step_info = remote.recv() + + states[i] = state + rewards[i] = reward + absorbings[i] = absorbing + step_infos.append(step_info) + else: + step_infos.append({}) + + return states, rewards, absorbings, step_infos def render_all(self, env_mask, record=False): for i, remote in enumerate(self._remotes): @@ -128,12 +159,15 @@ def seed(self, seed): def stop(self): for remote in self._remotes: remote.send(('stop', None)) + remote.recv() def __del__(self): - for remote in self._remotes: - remote.send(('close', None)) - for p in self._processes: - p.join() + if hasattr(self, '_remotes'): + for remote in self._remotes: + remote.send(('close', None)) + if hasattr(self, '_processes'): + for p in self._processes: + p.join() @staticmethod def generate(env, *args, n_envs=-1, **kwargs): diff --git a/mushroom_rl/core/vectorized_core.py b/mushroom_rl/core/vectorized_core.py index 87e9020c..d506712b 100644 --- a/mushroom_rl/core/vectorized_core.py +++ b/mushroom_rl/core/vectorized_core.py @@ -40,7 +40,7 @@ def __init__(self, agent, env, callbacks_fit=None, callback_step=None, record_di if record_dictionary is None: record_dictionary = dict() - self._record = [self._build_recorder_class(**record_dictionary) for _ in self.env.number] + self._record = [self._build_recorder_class(**record_dictionary) for _ in range(self.env.number)] def learn(self, n_steps=None, n_episodes=None, n_steps_per_fit=None, n_episodes_per_fit=None, render=False, record=False, quiet=False): @@ -67,7 +67,7 @@ def learn(self, n_steps=None, n_episodes=None, n_steps_per_fit=None, n_episodes_ self._core_logic.initialize_learn(n_steps_per_fit, n_episodes_per_fit) datasets = [Dataset(self.env.info, self.agent.info, n_steps_per_fit, n_episodes_per_fit) - for _ in self.env.number] + for _ in range(self.env.number)] self._run(datasets, n_steps, n_episodes, render, quiet, record) @@ -95,30 +95,28 @@ def evaluate(self, initial_states=None, n_steps=None, n_episodes=None, render=Fa self._core_logic.initialize_evaluate() n_episodes_dataset = len(initial_states) if initial_states is not None else n_episodes - datasets = [Dataset(self.env.info, self.agent.info, n_steps, n_episodes_dataset) for _ in self.env.number] + datasets = [Dataset(self.env.info, self.agent.info, n_steps, n_episodes_dataset) + for _ in range(self.env.number)] return self._run(datasets, n_steps, n_episodes, render, quiet, record, initial_states) def _run(self, datasets, n_steps, n_episodes, render, quiet, record, initial_states=None): self._core_logic.initialize_run(n_steps, n_episodes, initial_states, quiet) - last = None + last = np.ones(self.env.number, dtype=bool) while self._core_logic.move_required(): action_mask = self._core_logic.get_action_mask() last = np.logical_and(last, action_mask) if np.any(last): self._reset(initial_states, last) - sample, step_info = self._step(render, record, action_mask) + samples, step_infos = self._step(render, record, action_mask) - self.callback_step(sample) + self.callback_step(samples) - self._core_logic.after_step(np.logical_and(sample[5], action_mask)) + self._core_logic.after_step(np.logical_and(samples[5], action_mask)) - samples = list(zip(*sample)) - for i in range(self.env.number): - if action_mask[i]: - datasets[i].append(samples[i]) + self._add_to_dataset(action_mask, datasets, samples, step_infos) if self._core_logic.fit_required(): fit_dataset = self._aggregate(datasets) @@ -131,7 +129,7 @@ def _run(self, datasets, n_steps, n_episodes, render, quiet, record, initial_sta for dataset in datasets: dataset.clear() - last = sample[5] + last = samples[5] self.agent.stop() self.env.stop() @@ -140,6 +138,13 @@ def _run(self, datasets, n_steps, n_episodes, render, quiet, record, initial_sta return self._aggregate(datasets) + def _add_to_dataset(self, action_mask, datasets, samples, step_infos): + for i in range(self.env.number): + if action_mask[i]: + sample = (samples[0][i], samples[1][i], samples[2][i], samples[3][i], samples[4][i], samples[5][i]) + step_info = step_infos[i] + datasets[i].append(sample, step_info) + def _step(self, render, record, action_mask): """ Single step. @@ -153,10 +158,9 @@ def _step(self, render, record, action_mask): of the reached states and the last step flags. """ + action, policy_next_state = self.agent.draw_action(self._state, self._policy_state) - action, policy_next_state = self.agent.draw_action(self._states[action_mask], self._policy_state[action_mask]) - - next_state, rewards, absorbing, step_info = self.env.step_all(action, action_mask) + next_state, rewards, absorbing, step_info = self.env.step_all(action_mask, action) self._episode_steps += 1 @@ -179,12 +183,16 @@ def _reset(self, initial_states, mask): """ initial_state = self._core_logic.get_initial_state(initial_states) - # self.agent.episode_start(mask) FIXME - self.agent.episode_start() - self._states = self._preprocess(self.env.reset_all(initial_state, mask)) + state, episode_info = self._preprocess(self.env.reset_all(mask, initial_state)) + self._policy_state, self._current_theta = self.agent.episode_start(episode_info) # FIXME add mask support + self._state = self._preprocess(state) self.agent.next_action = None - self._episode_steps = np.multiply(self._episode_steps, np.logical_not(mask)) + + if self._episode_steps is None: + self._episode_steps = np.zeros(self.env.number, dtype=int) + else: + self._episode_steps[np.logical_not(mask)] = 0 def _end(self, record): self._state = None @@ -218,10 +226,13 @@ def _preprocess(self, states): def _aggregate(datasets): aggregated_dataset = datasets[0] - for dataset in datasets[1:]: - aggregated_dataset += dataset + if len(aggregated_dataset) > 0: + for dataset in datasets[1:]: + aggregated_dataset += dataset - return aggregated_dataset + return aggregated_dataset + else: + return None def _build_recorder_class(self, recorder_class=None, fps=None, **kwargs): """ diff --git a/mushroom_rl/utils/torch.py b/mushroom_rl/utils/torch.py index 41b6caee..29149539 100644 --- a/mushroom_rl/utils/torch.py +++ b/mushroom_rl/utils/torch.py @@ -105,7 +105,7 @@ def to_float_tensor(x, use_cuda=False): A float tensor build from the values contained in the input array. """ - x = torch.tensor(x, dtype=torch.float) + x = torch.as_tensor(x, dtype=torch.float) return x.cuda() if use_cuda else x