-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdynamic_prog.py
232 lines (182 loc) · 8.2 KB
/
dynamic_prog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class DPAgent:
def __init__(self, env, gamma=1.0, policy=None, seed=None):
import numpy as np
#--------------------------------------------------------
# Store environment and gamma.
#--------------------------------------------------------
self.env = env
self.gamma = gamma
#--------------------------------------------------------
# Get state information from environment.
#--------------------------------------------------------
self.states = self.env.get_states()
self.num_states = env.observation_space.n
#--------------------------------------------------------
# Get environment information from environment.
#--------------------------------------------------------
self.num_actions = env.action_space.n
self.actions = range(self.num_actions)
#--------------------------------------------------------
# Store policy or generate a random policy.
#--------------------------------------------------------
if policy is None:
np_state = set_np_seed(seed)
self.policy = {s:np.random.choice(self.actions) for s in self.states}
restore_np_state(np_state)
else:
self.policy = policy
#--------------------------------------------------------
# Initialize value functions to zero
#--------------------------------------------------------
self.Q = {s:np.zeros(self.num_actions) for s in self.states}
self.V = {s:0 for s in self.states}
def select_action(self, state):
return self.policy[state]
def policy_evaluation(
self, threshold=1e-6, max_iter=None, verbose=True, return_n=False):
'''
Evaluates the policy currenty attached to the agent.
'''
#--------------------------------------------------------
# Loop for max_iter iteratons, or until convergence.
#--------------------------------------------------------
if max_iter is None:
max_iter = float('inf')
n = 0
while n < max_iter:
n += 1
#--------------------------------------------------------
# Copy current V.
#--------------------------------------------------------
V_old = self.V.copy()
#--------------------------------------------------------
# Loop over all states
# Calculate new V(s) for each s
#--------------------------------------------------------
for s in self.states:
# Select action and then get transitions P(s,a)
a = self.policy[s]
transitions = self.env.P[s][a]
# Loop over all transitions
Vs = 0
for prob, s_, reward, done in transitions:
G_estimate = reward + (0 if done else self.gamma * V_old[s_])
Vs += prob * G_estimate
self.V[s] = Vs
#--------------------------------------------------------
# Check for convergence
#--------------------------------------------------------
max_diff = max([abs(V_old[s] - self.V[s]) for s in self.states])
if max_diff < threshold:
break
if verbose:
print(f'Policy evaluation required {n} iterations to converge.')
if return_n:
return n
def calculate_Q(self):
'''
Calculates estimate of Q from current estimate of V
'''
for s in self.states:
for a in self.actions:
trans = self.env.P[s][a]
Qsa = 0
for prob, s_, reward, done in trans:
G_est = reward + (0 if done else self.gamma * self.V[s_])
Qsa += prob * G_est
self.Q[s][a] = Qsa
def policy_improvement(self):
'''
Uses Q est to perform greedy policy improvement on cur policy.
Assumes that Q and V have been estimated for policy.
'''
import numpy as np
self.calculate_Q()
self.policy = {s : np.argmax(self.Q[s]) for s in self.states}
def policy_iteration(self, threshold=1e-6, verbose=True):
#----------------------------------------------------------
# Loop until convergence of policy
#----------------------------------------------------------
n = 0
num_eval_steps = 0
while True:
n += 1
# Store current policy
old_policy = self.policy.copy()
# Value current policy
num_eval_steps += self.policy_evaluation(
threshold=threshold, max_iter=None, return_n=True, verbose=False)
# Policy Improvement
self.policy_improvement()
#self.calculate_Q()
# Check for convergence
if old_policy == self.policy:
break
#----------------------------------------------------------
# Print report
#----------------------------------------------------------
if verbose:
print(f'Policy iteration required {n} steps to converge.')
print(f'{num_eval_steps} steps of policy evaluation were performed.')
def value_iteration(self, threshold=1e-6, verbose=True):
import numpy as np
#----------------------------------------------------------
# Loop until convergence of value function
#----------------------------------------------------------
n = 0
while True:
n += 1
# Store current V estimate
V_old = self.V.copy()
# Get Q from current V
self.calculate_Q()
# Determine new V from Q
self.V = {s : np.max(self.Q[s]) for s in self.states}
# Check for convergence
max_diff = max([abs(V_old[s] - self.V[s]) for s in self.states])
if max_diff < threshold:
break
#----------------------------------------------------------
# Determine Optimal Policy
#----------------------------------------------------------
self.calculate_Q()
self.policy = {s : np.argmax(self.Q[s]) for s in self.states}
#----------------------------------------------------------
# Display report
#----------------------------------------------------------
if verbose:
print(f'Value iteration required {n} steps to converge.')
def set_np_seed(seed):
import numpy as np
if seed is None: return None
np_state = np.random.get_state()
np.random.seed(seed)
return np_state
def restore_np_state(np_state):
import numpy as np
if np_state is None: return
np.random.set_state(np_state)
if __name__ == '__main__':
import gymnasium as gym
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
from rltools.envs import *
from rltools.utils import success_rate
base_4x4_env = gym.make('FrozenLake-v1', render_mode='ansi')
env = base_4x4_env
env.reset()
print(env.render())
mc = DPAgent(env, gamma=0.99)
states = range(env.observation_space.n)
direct_actions = [2, 2, 1, 0, 1, 0, 1, 0, 2, 2, 1, 0, 0, 2, 2, 0]
random_actions = [2, 0, 1, 3, 0, 0, 2, 0, 3, 1, 3, 0, 0, 2, 1, 0]
careful_actions = [0, 3, 3, 3, 0, 0, 3, 0, 3, 1, 0, 0, 0, 2, 2, 0]
direct_policy = {s:a for s, a in zip(states, direct_actions)}
random_policy = {s:a for s, a in zip(states, random_actions)}
careful_policy = {s:a for s, a in zip(states, careful_actions)}
mc.policy_evaluation(careful_policy, threshold=1e-6)
print()
#frozen_lake_show_value(env, mc.V, digits=3)
#print()
sr, info = success_rate(env, policy=careful_policy, n=1000, max_steps=1000, seed=None)
print(sr)
print(info)