-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathgymnasium.py
173 lines (131 loc) · 5.81 KB
/
gymnasium.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import numpy as np
from loco_mujoco import LocoEnv
from gymnasium import Env
from gymnasium.utils import seeding
from gymnasium.envs.registration import EnvSpec
from gymnasium.spaces import Box
class GymnasiumWrapper(Env):
"""
This class implements a simple wrapper to use all LocoMuJoCo environments
with the Gymnasium interface.
"""
def __init__(self, env_name, render_mode=None, **kwargs):
self.spec = EnvSpec(env_name)
self.metadata = {"render_modes": ["human", "rgb_array"]}
key_render_mode = "render_modes"
assert "headless" not in kwargs.keys(), f"headless parameter is not allowed in Gymnasium environment. " \
f"Please use the render_mode parameter. Supported modes are: " \
f"{self.metadata[key_render_mode]}"
if render_mode is not None:
assert render_mode in self.metadata["render_modes"], f"Unsupported render mode: {render_mode}. " \
f"Supported modes are: " \
f"{self.metadata[key_render_mode]}."
self.render_mode = render_mode
# specify the headless based on render mode to initialize the LocoMuJoCo environment
if render_mode == "human":
kwargs["headless"] = False
else:
kwargs["headless"] = True
self._env = LocoEnv.make(env_name, **kwargs)
self.metadata["render_fps"] = 1.0 / self._env.dt
self._set_observation_space()
self._set_action_space()
def step(self, action):
"""
Run one timestep of the environment's dynamics.
.. note:: We set the truncation always to False, as the time limit can be handled by the `TimeLimit`
wrapper in Gymnasium.
Args:
action (np.ndarray):
The action to be executed in the environment.
Returns:
Tuple of observation, reward, terminated, truncated and info.
"""
obs, reward, absorbing, info = self._env.step(action)
return obs, reward, absorbing, False, info
def reset(self, *, seed=None, options=None):
"""
Resets the state of the environment, returning an initial observation and info.
"""
# Initialize the RNG if the seed is manually passed
if seed is not None:
self._np_random, seed = seeding.np_random(seed)
return self._env.reset(), {}
def render(self):
"""
Renders the environment.
"""
if self.render_mode == "human":
self._env.render()
elif self.render_mode == "rgb_array":
img = self._env.render(True)
return np.swapaxes(img, 0, 1)
def close(self):
"""
Closes the environment.
"""
self._env.stop()
def create_dataset(self, **kwargs):
"""
Creates a dataset from the specified trajectories.
Args:
ignore_keys (list): List of keys to ignore in the dataset.
Returns:
Dictionary containing states, next_states and absorbing flags. For the states the shape is
(N_traj x N_samples_per_traj, dim_state), while the absorbing flag has the shape is
(N_traj x N_samples_per_traj). For perfect and preference datasets, the actions are also provided.
"""
return self._env.create_dataset(**kwargs)
def play_trajectory(self, **kwargs):
"""
Plays a demo of the loaded trajectory by forcing the model
positions to the ones in the trajectories at every step.
Args:
n_episodes (int): Number of episode to replay.
n_steps_per_episode (int): Number of steps to replay per episode.
render (bool): If True, trajectory will be rendered.
record (bool): If True, the rendered trajectory will be recorded.
recorder_params (dict): Dictionary containing the recorder parameters.
"""
return self._env.play_trajectory(**kwargs)
def play_trajectory_from_velocity(self, **kwargs):
"""
Plays a demo of the loaded trajectory by forcing the model
positions to the ones calculated from the joint velocities
in the trajectories at every step. Therefore, the joint positions
are set from the trajectory in the first step. Afterwards, numerical
integration is used to calculate the next joint positions using
the joint velocities in the trajectory.
Args:
n_episodes (int): Number of episode to replay.
n_steps_per_episode (int): Number of steps to replay per episode.
render (bool): If True, trajectory will be rendered.
record (bool): If True, the replay will be recorded.
recorder_params (dict): Dictionary containing the recorder parameters.
"""
return self._env.play_trajectory_from_velocity(**kwargs)
@property
def unwrapped(self):
"""
Returns the inner environment.
"""
return self._env
def _set_observation_space(self):
"""
Setter for the observation space.
"""
self.observation_space = self._convert_space(self._env.info.observation_space)
return self.observation_space
def _set_action_space(self):
"""
Setter for the action space.
"""
self.action_space = self._convert_space(self._env.info.action_space)
return self.action_space
@staticmethod
def _convert_space(space):
""" Converts the observation and action space from mushroom-rl to gymnasium. """
low = np.min(space.low)
high = np.max(space.high)
shape = space.shape
return Box(low, high, shape, np.float64)