-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcore.py
719 lines (599 loc) · 30.2 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
# -*- coding: utf-8 -*-
import warnings
from copy import deepcopy
import numpy as np
from keras.callbacks import History
from rl.callbacks import (
CallbackList,
TestLogger,
TrainEpisodeLogger,
TrainIntervalLogger,
Visualizer
)
class Agent(object):
"""Abstract base class for all implemented agents.
Each agent interacts with the environment (as defined by the `Env` class) by first observing the
state of the environment. Based on this observation the agent changes the environment by performing
an action.
Do not use this abstract base class directly but instead use one of the concrete agents implemented.
Each agent realizes a reinforcement learning algorithm. Since all agents conform to the same
interface, you can use them interchangeably.
To implement your own agent, you have to implement the following methods:
- `forward`
- `backward`
- `compile`
- `load_weights`
- `save_weights`
- `layers`
# Arguments
processor (`Processor` instance): See [Processor](#processor) for details.
"""
def __init__(self, processor=None):
self.processor = processor
self.training = False
self.step = 0
def get_config(self):
"""Configuration of the agent for serialization.
# Returns
Dictionnary with agent configuration
"""
return {}
def fit(self, env, nb_steps, action_repetition=1, callbacks=None, verbose=1,
visualize=False, nb_max_start_steps=0, start_step_policy=None, log_interval=10000,
nb_max_episode_steps=None):
"""Trains the agent on the given environment.
# Arguments
env: (`Env` instance): Environment that the agent interacts with. See [Env](#env) for details.
nb_steps (integer): Number of training steps to be performed.
action_repetition (integer): Number of times the agent repeats the same action without
observing the environment again. Setting this to a value > 1 can be useful
if a single action only has a very small effect on the environment.
callbacks (list of `keras.callbacks.Callback` or `rl.callbacks.Callback` instances):
List of callbacks to apply during training. See [callbacks](/callbacks) for details.
verbose (integer): 0 for no logging, 1 for interval logging (compare `log_interval`), 2 for episode logging
visualize (boolean): If `True`, the environment is visualized during training. However,
this is likely going to slow down training significantly and is thus intended to be
a debugging instrument.
nb_max_start_steps (integer): Number of maximum steps that the agent performs at the beginning
of each episode using `start_step_policy`. Notice that this is an upper limit since
the exact number of steps to be performed is sampled uniformly from [0, max_start_steps]
at the beginning of each episode.
start_step_policy (`lambda observation: action`): The policy
to follow if `nb_max_start_steps` > 0. If set to `None`, a random action is performed.
log_interval (integer): If `verbose` = 1, the number of steps that are considered to be an interval.
nb_max_episode_steps (integer): Number of steps per episode that the agent performs before
automatically resetting the environment. Set to `None` if each episode should run
(potentially indefinitely) until the environment signals a terminal state.
# Returns
A `keras.callbacks.History` instance that recorded the entire training process.
"""
if not self.compiled:
raise RuntimeError('Your tried to fit your agent but it hasn\'t been compiled yet. Please call `compile()` before `fit()`.')
if action_repetition < 1:
raise ValueError('action_repetition must be >= 1, is {}'.format(action_repetition))
self.training = True
callbacks = [] if not callbacks else callbacks[:]
if verbose == 1:
callbacks += [TrainIntervalLogger(interval=log_interval)]
elif verbose > 1:
callbacks += [TrainEpisodeLogger()]
if visualize:
callbacks += [Visualizer()]
history = History()
callbacks += [history]
callbacks = CallbackList(callbacks)
if hasattr(callbacks, 'set_model'):
callbacks.set_model(self)
else:
callbacks._set_model(self)
callbacks._set_env(env)
params = {
'nb_steps': nb_steps,
}
if hasattr(callbacks, 'set_params'):
callbacks.set_params(params)
else:
callbacks._set_params(params)
self._on_train_begin()
callbacks.on_train_begin()
episode = np.int16(0)
self.step = np.int16(0)
observation = None
episode_reward = None
episode_step = None
did_abort = False
try:
while self.step < nb_steps:
if observation is None: # start of a new episode
callbacks.on_episode_begin(episode)
episode_step = np.int16(0)
episode_reward = np.float32(0)
# Obtain the initial observation by resetting the environment.
self.reset_states()
observation = deepcopy(env.reset())
if self.processor is not None:
observation = self.processor.process_observation(observation)
assert observation is not None
# Perform random starts at beginning of episode and do not record them into the experience.
# This slightly changes the start position between games.
nb_random_start_steps = 0 if nb_max_start_steps == 0 else np.random.randint(nb_max_start_steps)
for _ in range(nb_random_start_steps):
if start_step_policy is None:
action = env.action_space.sample()
else:
action = start_step_policy(observation)
if self.processor is not None:
action = self.processor.process_action(action)
callbacks.on_action_begin(action)
observation, reward, done, info = env.step(action)
observation = deepcopy(observation)
if self.processor is not None:
observation, reward, done, info = self.processor.process_step(observation, reward, done, info)
callbacks.on_action_end(action)
if done:
warnings.warn('Env ended before {} random steps could be performed at the start. You should probably lower the `nb_max_start_steps` parameter.'.format(nb_random_start_steps))
observation = deepcopy(env.reset())
if self.processor is not None:
observation = self.processor.process_observation(observation)
break
# At this point, we expect to be fully initialized.
assert episode_reward is not None
assert episode_step is not None
assert observation is not None
# Run a single step.
callbacks.on_step_begin(episode_step)
# This is were all of the work happens. We first perceive and compute the action
# (forward step) and then use the reward to improve (backward step).
action = self.forward(observation)
if self.processor is not None:
action = self.processor.process_action(action)
reward = np.float32(0)
accumulated_info = {}
done = False
for _ in range(action_repetition):
callbacks.on_action_begin(action)
observation, r, done, info = env.step(action)
observation = deepcopy(observation)
if self.processor is not None:
observation, r, done, info = self.processor.process_step(observation, r, done, info)
for key, value in info.items():
if not np.isreal(value):
continue
if key not in accumulated_info:
accumulated_info[key] = np.zeros_like(value)
accumulated_info[key] += value
callbacks.on_action_end(action)
reward += r
if done:
break
if nb_max_episode_steps and episode_step >= nb_max_episode_steps - 1:
# Force a terminal state.
done = True
metrics = self.backward(reward, terminal=done)
episode_reward += reward
step_logs = {
'action': action,
'observation': observation,
'reward': reward,
'metrics': metrics,
'episode': episode,
'info': accumulated_info,
}
callbacks.on_step_end(episode_step, step_logs)
episode_step += 1
self.step += 1
if done:
# We are in a terminal state but the agent hasn't yet seen it. We therefore
# perform one more forward-backward call and simply ignore the action before
# resetting the environment. We need to pass in `terminal=False` here since
# the *next* state, that is the state of the newly reset environment, is
# always non-terminal by convention.
self.forward(observation)
self.backward(0., terminal=False)
# This episode is finished, report and reset.
episode_logs = {
'episode_reward': episode_reward,
'nb_episode_steps': episode_step,
'nb_steps': self.step,
}
callbacks.on_episode_end(episode, episode_logs)
episode += 1
observation = None
episode_step = None
episode_reward = None
except KeyboardInterrupt:
# We catch keyboard interrupts here so that training can be be safely aborted.
# This is so common that we've built this right into this function, which ensures that
# the `on_train_end` method is properly called.
did_abort = True
callbacks.on_train_end(logs={'did_abort': did_abort})
self._on_train_end()
return history
def test(self, env, nb_episodes=1, action_repetition=1, callbacks=None, visualize=True,
nb_max_episode_steps=None, nb_max_start_steps=0, start_step_policy=None, verbose=1):
"""Callback that is called before training begins.
# Arguments
env: (`Env` instance): Environment that the agent interacts with. See [Env](#env) for details.
nb_episodes (integer): Number of episodes to perform.
action_repetition (integer): Number of times the agent repeats the same action without
observing the environment again. Setting this to a value > 1 can be useful
if a single action only has a very small effect on the environment.
callbacks (list of `keras.callbacks.Callback` or `rl.callbacks.Callback` instances):
List of callbacks to apply during training. See [callbacks](/callbacks) for details.
verbose (integer): 0 for no logging, 1 for interval logging (compare `log_interval`), 2 for episode logging
visualize (boolean): If `True`, the environment is visualized during training. However,
this is likely going to slow down training significantly and is thus intended to be
a debugging instrument.
nb_max_start_steps (integer): Number of maximum steps that the agent performs at the beginning
of each episode using `start_step_policy`. Notice that this is an upper limit since
the exact number of steps to be performed is sampled uniformly from [0, max_start_steps]
at the beginning of each episode.
start_step_policy (`lambda observation: action`): The policy
to follow if `nb_max_start_steps` > 0. If set to `None`, a random action is performed.
log_interval (integer): If `verbose` = 1, the number of steps that are considered to be an interval.
nb_max_episode_steps (integer): Number of steps per episode that the agent performs before
automatically resetting the environment. Set to `None` if each episode should run
(potentially indefinitely) until the environment signals a terminal state.
# Returns
A `keras.callbacks.History` instance that recorded the entire training process.
"""
if not self.compiled:
raise RuntimeError('Your tried to test your agent but it hasn\'t been compiled yet. Please call `compile()` before `test()`.')
if action_repetition < 1:
raise ValueError('action_repetition must be >= 1, is {}'.format(action_repetition))
self.training = False
self.step = 0
callbacks = [] if not callbacks else callbacks[:]
if verbose >= 1:
callbacks += [TestLogger()]
if visualize:
callbacks += [Visualizer()]
history = History()
callbacks += [history]
callbacks = CallbackList(callbacks)
if hasattr(callbacks, 'set_model'):
callbacks.set_model(self)
else:
callbacks._set_model(self)
callbacks._set_env(env)
params = {
'nb_episodes': nb_episodes,
}
if hasattr(callbacks, 'set_params'):
callbacks.set_params(params)
else:
callbacks._set_params(params)
self._on_test_begin()
callbacks.on_train_begin()
for episode in range(nb_episodes):
callbacks.on_episode_begin(episode)
episode_reward = 0.
episode_step = 0
# Obtain the initial observation by resetting the environment.
self.reset_states()
observation = deepcopy(env.reset())
if self.processor is not None:
observation = self.processor.process_observation(observation)
assert observation is not None
states = [observation] ##############
# Perform random starts at beginning of episode and do not record them into the experience.
# This slightly changes the start position between games.
nb_random_start_steps = 0 if nb_max_start_steps == 0 else np.random.randint(nb_max_start_steps)
for _ in range(nb_random_start_steps):
if start_step_policy is None:
action = env.action_space.sample()
else:
action = start_step_policy(observation)
if self.processor is not None:
action = self.processor.process_action(action)
callbacks.on_action_begin(action)
observation, r, done, info = env.step(action)
observation = deepcopy(observation)
if self.processor is not None:
observation, r, done, info = self.processor.process_step(observation, r, done, info)
callbacks.on_action_end(action)
if done:
warnings.warn('Env ended before {} random steps could be performed at the start. You should probably lower the `nb_max_start_steps` parameter.'.format(nb_random_start_steps))
observation = deepcopy(env.reset())
if self.processor is not None:
observation = self.processor.process_observation(observation)
break
# Run the episode until we're done.
done = False
actions = [] #####################
while not done:
callbacks.on_step_begin(episode_step)
action = self.forward(observation)
if self.processor is not None:
action = self.processor.process_action(action)
actions.append(action) ####################
reward = 0.
accumulated_info = {}
for _ in range(action_repetition):
callbacks.on_action_begin(action)
observation, r, d, info = env.step(action)
observation = deepcopy(observation)
if self.processor is not None:
observation, r, d, info = self.processor.process_step(observation, r, d, info)
callbacks.on_action_end(action)
reward += r
states.append(observation) ###############################
for key, value in info.items():
if not np.isreal(value):
continue
if key not in accumulated_info:
accumulated_info[key] = np.zeros_like(value)
accumulated_info[key] += value
if d:
done = True
break
if nb_max_episode_steps and episode_step >= nb_max_episode_steps - 1:
done = True
self.backward(reward, terminal=done)
episode_reward += reward
step_logs = {
'action': action,
'observation': observation,
'reward': reward,
'episode': episode,
'info': accumulated_info,
}
callbacks.on_step_end(episode_step, step_logs)
episode_step += 1
self.step += 1
# We are in a terminal state but the agent hasn't yet seen it. We therefore
# perform one more forward-backward call and simply ignore the action before
# resetting the environment. We need to pass in `terminal=False` here since
# the *next* state, that is the state of the newly reset environment, is
# always non-terminal by convention.
self.forward(observation)
self.backward(0., terminal=False)
# Report end of episode.
episode_logs = {
'episode_reward': episode_reward,
'nb_steps': episode_step,
'action' : actions,
'states' : states
}
callbacks.on_episode_end(episode, episode_logs)
callbacks.on_train_end()
self._on_test_end()
return history
def reset_states(self):
"""Resets all internally kept states after an episode is completed.
"""
pass
def forward(self, observation):
"""Takes the an observation from the environment and returns the action to be taken next.
If the policy is implemented by a neural network, this corresponds to a forward (inference) pass.
# Argument
observation (object): The current observation from the environment.
# Returns
The next action to be executed in the environment.
"""
raise NotImplementedError()
def backward(self, reward, terminal):
"""Updates the agent after having executed the action returned by `forward`.
If the policy is implemented by a neural network, this corresponds to a weight update using back-prop.
# Argument
reward (float): The observed reward after executing the action returned by `forward`.
terminal (boolean): `True` if the new state of the environment is terminal.
# Returns
List of metrics values
"""
raise NotImplementedError()
def compile(self, optimizer, metrics=[]):
"""Compiles an agent and the underlaying models to be used for training and testing.
# Arguments
optimizer (`keras.optimizers.Optimizer` instance): The optimizer to be used during training.
metrics (list of functions `lambda y_true, y_pred: metric`): The metrics to run during training.
"""
raise NotImplementedError()
def load_weights(self, filepath):
"""Loads the weights of an agent from an HDF5 file.
# Arguments
filepath (str): The path to the HDF5 file.
"""
raise NotImplementedError()
def save_weights(self, filepath, overwrite=False):
"""Saves the weights of an agent as an HDF5 file.
# Arguments
filepath (str): The path to where the weights should be saved.
overwrite (boolean): If `False` and `filepath` already exists, raises an error.
"""
raise NotImplementedError()
@property
def layers(self):
"""Returns all layers of the underlying model(s).
If the concrete implementation uses multiple internal models,
this method returns them in a concatenated list.
# Returns
A list of the model's layers
"""
raise NotImplementedError()
@property
def metrics_names(self):
"""The human-readable names of the agent's metrics. Must return as many names as there
are metrics (see also `compile`).
# Returns
A list of metric's names (string)
"""
return []
def _on_train_begin(self):
"""Callback that is called before training begins."
"""
pass
def _on_train_end(self):
"""Callback that is called after training ends."
"""
pass
def _on_test_begin(self):
"""Callback that is called before testing begins."
"""
pass
def _on_test_end(self):
"""Callback that is called after testing ends."
"""
pass
class Processor(object):
"""Abstract base class for implementing processors.
A processor acts as a coupling mechanism between an `Agent` and its `Env`. This can
be necessary if your agent has different requirements with respect to the form of the
observations, actions, and rewards of the environment. By implementing a custom processor,
you can effectively translate between the two without having to change the underlaying
implementation of the agent or environment.
Do not use this abstract base class directly but instead use one of the concrete implementations
or write your own.
"""
def process_step(self, observation, reward, done, info):
"""Processes an entire step by applying the processor to the observation, reward, and info arguments.
# Arguments
observation (object): An observation as obtained by the environment.
reward (float): A reward as obtained by the environment.
done (boolean): `True` if the environment is in a terminal state, `False` otherwise.
info (dict): The debug info dictionary as obtained by the environment.
# Returns
The tupel (observation, reward, done, reward) with with all elements after being processed.
"""
observation = self.process_observation(observation)
reward = self.process_reward(reward)
info = self.process_info(info)
return observation, reward, done, info
def process_observation(self, observation):
"""Processes the observation as obtained from the environment for use in an agent and
returns it.
# Arguments
observation (object): An observation as obtained by the environment
# Returns
Observation obtained by the environment processed
"""
return observation
def process_reward(self, reward):
"""Processes the reward as obtained from the environment for use in an agent and
returns it.
# Arguments
reward (float): A reward as obtained by the environment
# Returns
Reward obtained by the environment processed
"""
return reward
def process_info(self, info):
"""Processes the info as obtained from the environment for use in an agent and
returns it.
# Arguments
info (dict): An info as obtained by the environment
# Returns
Info obtained by the environment processed
"""
return info
def process_action(self, action):
"""Processes an action predicted by an agent but before execution in an environment.
# Arguments
action (int): Action given to the environment
# Returns
Processed action given to the environment
"""
return action
def process_state_batch(self, batch):
"""Processes an entire batch of states and returns it.
# Arguments
batch (list): List of states
# Returns
Processed list of states
"""
return batch
@property
def metrics(self):
"""The metrics of the processor, which will be reported during training.
# Returns
List of `lambda y_true, y_pred: metric` functions.
"""
return []
@property
def metrics_names(self):
"""The human-readable names of the agent's metrics. Must return as many names as there
are metrics (see also `compile`).
"""
return []
# Note: the API of the `Env` and `Space` classes are taken from the OpenAI Gym implementation.
# https://github.com/openai/gym/blob/master/gym/core.py
class Env(object):
"""The abstract environment class that is used by all agents. This class has the exact
same API that OpenAI Gym uses so that integrating with it is trivial. In contrast to the
OpenAI Gym implementation, this class only defines the abstract methods without any actual
implementation.
To implement your own environment, you need to define the following methods:
- `step`
- `reset`
- `render`
- `close`
Refer to the [Gym documentation](https://gym.openai.com/docs/#environments).
"""
reward_range = (-np.inf, np.inf)
action_space = None
observation_space = None
def step(self, action):
"""Run one timestep of the environment's dynamics.
Accepts an action and returns a tuple (observation, reward, done, info).
# Arguments
action (object): An action provided by the environment.
# Returns
observation (object): Agent's observation of the current environment.
reward (float) : Amount of reward returned after previous action.
done (boolean): Whether the episode has ended, in which case further step() calls will return undefined results.
info (dict): Contains auxiliary diagnostic information (helpful for debugging, and sometimes learning).
"""
raise NotImplementedError()
def reset(self):
"""
Resets the state of the environment and returns an initial observation.
# Returns
observation (object): The initial observation of the space. Initial reward is assumed to be 0.
"""
raise NotImplementedError()
def render(self, mode='human', close=False):
"""Renders the environment.
The set of supported modes varies per environment. (And some
environments do not support rendering at all.)
# Arguments
mode (str): The mode to render with.
close (bool): Close all open renderings.
"""
raise NotImplementedError()
def close(self):
"""Override in your subclass to perform any necessary cleanup.
Environments will automatically close() themselves when
garbage collected or when the program exits.
"""
raise NotImplementedError()
def seed(self, seed=None):
"""Sets the seed for this env's random number generator(s).
# Returns
Returns the list of seeds used in this env's random number generators
"""
raise NotImplementedError()
def configure(self, *args, **kwargs):
"""Provides runtime configuration to the environment.
This configuration should consist of data that tells your
environment how to run (such as an address of a remote server,
or path to your ImageNet data). It should not affect the
semantics of the environment.
"""
raise NotImplementedError()
def __del__(self):
self.close()
def __str__(self):
return '<{} instance>'.format(type(self).__name__)
class Space(object):
"""Abstract model for a space that is used for the state and action spaces. This class has the
exact same API that OpenAI Gym uses so that integrating with it is trivial.
Please refer to [Gym Documentation](https://gym.openai.com/docs/#spaces)
"""
def sample(self, seed=None):
"""Uniformly randomly sample a random element of this space.
"""
raise NotImplementedError()
def contains(self, x):
"""Return boolean specifying if x is a valid member of this space
"""
raise NotImplementedError()