Skip to content

Commit

Permalink
Major improvements
Browse files Browse the repository at this point in the history
- added vectorized policies for black box optimization
- fixed bug in datasets backends
- added black box optimization test
- work in progress on vectorized dataset, still many issues needs to be
solved
  • Loading branch information
boris-il-forte committed Dec 6, 2023
1 parent 42ab32d commit 8d0be98
Show file tree
Hide file tree
Showing 15 changed files with 257 additions and 10 deletions.
Empty file.
File renamed without changes.
67 changes: 67 additions & 0 deletions examples/vectorized_core/segway_bbo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import numpy as np

from mushroom_rl.core import VectorCore, Logger, MultiprocessEnvironment
from mushroom_rl.environments.segway import Segway
from mushroom_rl.algorithms.policy_search import *
from mushroom_rl.policy import DeterministicPolicy
from mushroom_rl.distributions import GaussianDiagonalDistribution
from mushroom_rl.approximators import Regressor
from mushroom_rl.approximators.parametric import LinearApproximator
from mushroom_rl.utils.callbacks import CollectDataset
from mushroom_rl.rl_utils.optimizers import AdaptiveOptimizer

from tqdm import tqdm, trange
tqdm.monitor_interval = 0


def experiment(alg, params, n_epochs, n_episodes, n_ep_per_fit):
np.random.seed()

logger = Logger(alg.__name__, results_dir=None)
logger.strong_line()
logger.info('Experiment Algorithm: ' + alg.__name__)

# MDP
mdp = MultiprocessEnvironment(Segway, n_envs=15)

# Policy
approximator = Regressor(LinearApproximator,
input_shape=mdp.info.observation_space.shape,
output_shape=mdp.info.action_space.shape)

n_weights = approximator.weights_size
mu = np.zeros(n_weights)
sigma = 2e-0 * np.ones(n_weights)
policy = DeterministicPolicy(approximator)
dist = GaussianDiagonalDistribution(mu, sigma)

agent = alg(mdp.info, dist, policy, **params)

# Train
dataset_callback = CollectDataset()
core = VectorCore(agent, mdp, callbacks_fit=[dataset_callback])

for i in trange(n_epochs, leave=False):
core.learn(n_episodes=n_episodes,
n_episodes_per_fit=n_ep_per_fit, render=False)
dataset = dataset_callback.get()
J = np.mean(dataset.discounted_return)
dataset_callback.clean()

p = dist.get_parameters()

logger.epoch_info(i+1, J=J, mu=p[:n_weights], sigma=p[n_weights:])

logger.info('Press a button to visualize the segway...')
input()
core.evaluate(n_episodes=3, render=True)


if __name__ == '__main__':
algs_params = [
(REPS, {'eps': 0.05}),
(RWR, {'beta': 0.01}),
(PGPE, {'optimizer': AdaptiveOptimizer(eps=0.3)}),
]
for alg, params in algs_params:
experiment(alg, params, n_epochs=20, n_episodes=100, n_ep_per_fit=25)
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import numpy as np

from mushroom_rl.core import Agent
from mushroom_rl.policy import VectorPolicy


class BlackBoxOptimization(Agent):
Expand All @@ -26,13 +27,30 @@ def __init__(self, mdp_info, distribution, policy):
super().__init__(mdp_info, policy, is_episodic=True)

def episode_start(self, episode_info):
if isinstance(self.policy, VectorPolicy):
self.policy = self.policy.get_flat_policy()

theta = self.distribution.sample()
self.policy.set_weights(theta)

policy_state, _ = super().episode_start(episode_info)

return policy_state, theta

def episode_start_vectorized(self, episode_info, n_envs):
if not isinstance(self.policy, VectorPolicy):
self.policy = VectorPolicy(self.policy, n_envs)
elif len(self.policy) != n_envs:
self.policy.set_n(n_envs)

theta = [self.distribution.sample() for _ in range(n_envs)]

self.policy.set_weights(theta)

policy_state, _ = super().episode_start(episode_info)

return policy_state, theta

def fit(self, dataset):
Jep = np.array(dataset.discounted_return)
theta = np.array(dataset.theta_list)
Expand Down
3 changes: 3 additions & 0 deletions mushroom_rl/core/_impl/numpy_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def from_array(cls, states, actions, rewards, next_states, absorbings, lasts,

dataset._policy_states = policy_states
dataset._policy_next_states = policy_next_states
else:
dataset._policy_states = None
dataset._policy_next_states = None

dataset._add_save_attr(
_state_type='primitive',
Expand Down
3 changes: 3 additions & 0 deletions mushroom_rl/core/_impl/torch_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ def from_array(cls, states, actions, rewards, next_states, absorbings, lasts,

dataset._policy_states = policy_states
dataset._policy_next_states = policy_next_states
else:
dataset._policy_states = None
dataset._policy_next_states = None

dataset._add_save_attr(
_state_type='primitive',
Expand Down
9 changes: 7 additions & 2 deletions mushroom_rl/core/_impl/vectorized_core_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ def get_mask(self, last):
terminated_episodes = (last & self._running_envs).sum()
running_episodes = (~last & self._running_envs).sum()

if running_episodes == 0 and terminated_episodes == 0:
first_batch = running_episodes == 0 and terminated_episodes == 0

if first_batch:
terminated_episodes = self._n_envs

max_runs = terminated_episodes
Expand All @@ -31,7 +33,10 @@ def get_mask(self, last):

new_mask = self._array_backend.ones(terminated_episodes, dtype=bool)
new_mask[max_runs:] = False
mask[last] = new_mask
if first_batch:
mask = new_mask
else:
mask[last] = new_mask

self._running_envs = self._array_backend.copy(mask)

Expand Down
19 changes: 18 additions & 1 deletion mushroom_rl/core/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,31 @@ def draw_action(self, state, policy_state=None):

def episode_start(self, episode_info):
"""
Called by the agent when a new episode starts.
Called by the Core when a new episode starts.
Args:
episode_info (dict): a dictionary containing the information at reset, such as context.
Returns:
A tuple containing the policy initial state and, optionally, the policy parameters
"""
return self.policy.reset(), None

def episode_start_vectorized(self, episode_info, n_envs):
"""
Called by the VectorCore when a new episode starts.
Args:
episode_info (dict): a dictionary containing the information at reset, such as context;
n_envs (int): number of environments in parallel to run.
Returns:
A tuple containing the policy initial state and, optionally, the policy parameters
"""
return self.episode_start(episode_info)

def stop(self):
"""
Method used to stop an agent. Useful when dealing with real world environments, simulators, or to cleanup
Expand Down
33 changes: 28 additions & 5 deletions mushroom_rl/core/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,24 @@ def __init__(self, mdp_info, agent_info, n_envs, n_steps=None, n_episodes=None):
_length=mdp_info.backend
)

self._initialize_theta_list(n_envs)

def append_vectorized(self, step, info, mask):
self.append(step, {}) # FIXME!!!
self.append(step, {}) # FIXME: handle properly info

self._length[mask] += 1

def append_theta_vectorized(self, theta, mask):
for i in range(len(theta)):
if mask[i]:
self._theta_list[i].append(theta[i])

def clear(self):
n_envs = len(self._theta_list)
super().clear()

self._length = self._array_backend.zeros(len(self._length), dtype=int)
self._initialize_theta_list(n_envs)

def flatten(self):
if len(self) == 0:
Expand All @@ -472,10 +481,24 @@ def flatten(self):
policy_state = self._array_backend.pack_padded_sequence(self._data.policy_state, self._length)
policy_next_state = self._array_backend.pack_padded_sequence(self._data.policy_next_state, self._length)

return self.from_array(states, actions, rewards, next_states, absorbings, lasts,
policy_state=policy_state, policy_next_state=policy_next_state,
info=None, episode_info=None, theta_list=None, # FIXME!!!
gamma=self._gamma, backend=self._array_backend.get_backend_name())
flat_theta_list = self._flatten_theta_list()

return Dataset.from_array(states, actions, rewards, next_states, absorbings, lasts,
policy_state=policy_state, policy_next_state=policy_next_state,
info=None, episode_info=None, theta_list=flat_theta_list, # FIXME: handle properly info
gamma=self._gamma, backend=self._array_backend.get_backend_name())

def _flatten_theta_list(self):
flat_theta_list = list()

for env_theta_list in self._theta_list:
flat_theta_list += env_theta_list

return flat_theta_list

def _initialize_theta_list(self, n_envs):
for i in range(n_envs):
self._theta_list.append(list())



Expand Down
5 changes: 4 additions & 1 deletion mushroom_rl/core/vectorized_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ def _run(self, dataset, n_steps, n_episodes, render, quiet, record, initial_stat
mask = self._core_logic.get_mask(last)
self._reset(initial_states, last, mask)

if self.agent.info.is_episodic:
dataset.append_theta_vectorized(self._current_theta, mask)

samples, step_infos = self._step(render, record, mask)

self.callback_step(samples)
Expand Down Expand Up @@ -180,7 +183,7 @@ def _reset(self, initial_states, last, mask):
initial_state = self._core_logic.get_initial_state(initial_states)

state, episode_info = self._preprocess(self.env.reset_all(reset_mask, initial_state))
self._policy_state, self._current_theta = self.agent.episode_start(episode_info) # FIXME add mask support
self._policy_state, self._current_theta = self.agent.episode_start_vectorized(episode_info, self.env.number)
self._state = self._preprocess(state)
self.agent.next_action = None

Expand Down
1 change: 1 addition & 0 deletions mushroom_rl/policy/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .policy import Policy, ParametricPolicy
from .vector_policy import VectorPolicy
from .noise_policy import OrnsteinUhlenbeckPolicy, ClippedGaussianPolicy
from .td_policy import TDPolicy, Boltzmann, EpsGreedy, Mellowmax
from .gaussian_policy import GaussianPolicy, DiagonalGaussianPolicy, \
Expand Down
4 changes: 4 additions & 0 deletions mushroom_rl/policy/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ def __init__(self, policy_state_shape=None):
policy_state_shape (tuple, None): the shape of the internal state of the policy.
"""
super().__init__()

self.policy_state_shape = policy_state_shape

self._add_save_attr(policy_state_shape='primitive')

def __call__(self, state, action, policy_state):
"""
Compute the probability of taking action in a certain state following
Expand Down
99 changes: 99 additions & 0 deletions mushroom_rl/policy/vector_policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import numpy as np
from copy import deepcopy

from .policy import ParametricPolicy


class VectorPolicy(ParametricPolicy):
def __init__(self, policy, n_envs):
"""
Constructor.
Args:
policy (ParametricPolicy): base policy to copy
n_envs: number of environments to be repeated.
"""
super().__init__(policy_state_shape=policy.policy_state_shape)
self._policy_vector = [deepcopy(policy) for _ in range(n_envs)]

self._add_save_attr(_policy_vector='mushroom')

def draw_action(self, state, policy_state):
actions = list()
policy_next_states = list()
for i, policy in enumerate(self._policy_vector):
s = state[i]
ps = policy_state[i] if policy_state is not None else None
action, policy_next_state = policy.draw_action(s, policy_state=ps)

actions.append(action)

if policy_next_state is not None:
policy_next_state.append(policy_next_state)

return np.array(actions), None if len(policy_next_states) == 0 else np.array(policy_next_state)

def set_n(self, n_envs):
if len(self) < n_envs:
self._policy_vector = self._policy_vector[:n_envs]
if len(self) > n_envs:
n_missing = n_envs - len(self)
self._policy_vector += [self._policy_vector[0] for _ in range(n_missing)]

def get_flat_policy(self):
return self._policy_vector[0]

def set_weights(self, weights):
"""
Setter.
Args:
weights (np.ndarray): the vector of the new weights to be used by
the policy.
"""
for i, policy in enumerate(self._policy_vector):
policy.set_weights(weights[i])

def get_weights(self):
"""
Getter.
Returns:
The current policy weights.
"""

weight_list = list()
for i, policy in enumerate(self._policy_vector):
weights_i = policy.get_weights()
weight_list.append(weights_i)

return weight_list

@property
def weights_size(self):
"""
Property.
Returns:
The size of the policy weights.
"""
return len(self), self._policy_vector[0].weights_size

def reset(self):
policy_states = list()
for i, policy in enumerate(self._policy_vector):
policy_state = policy.reset()

if policy_state is not None:
policy_states.append(policy_state)

return None if len(policy_states) == 0 else np.array(policy_states)

def __len__(self):
return len(self._policy_vector)


4 changes: 4 additions & 0 deletions mushroom_rl/utils/callbacks/collect_dataset.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from mushroom_rl.utils.callbacks.callback import Callback
from mushroom_rl.core.dataset import VectorizedDataset


class CollectDataset(Callback):
Expand All @@ -14,6 +15,9 @@ def __init__(self):
self._dataset = None

def __call__(self, dataset):
if isinstance(dataset, VectorizedDataset):
dataset = dataset.flatten()

if self._dataset is None:
self._dataset = dataset
else:
Expand Down
Loading

0 comments on commit 8d0be98

Please sign in to comment.