forked from Pdbz199/Koopman-RL-Old
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgeneral_model.py
175 lines (144 loc) · 5.59 KB
/
general_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#%%
import observables
import numpy as np
import scipy as sp
import numba as nb
import estimate_L
from scipy import integrate
from algorithms import learningAlgorithm, rgEDMD#, onlineKoopmanLearning
# @nb.njit(fastmath=True)
# def ln(x):
# return np.log(x)
@nb.njit(fastmath=True) #, parallel=True)
def nb_einsum(A, B):
assert A.shape == B.shape
res = 0
for i in range(A.shape[0]):
for j in range(A.shape[1]):
res += A[i,j]*B[i,j]
return res
@nb.njit(fastmath=True)
def dpsi(X, nablaPsi, nabla2Psi, k, l, t=1):
difference = X[:, l+1] - X[:, l]
term_1 = (1/t) * (difference)
term_2 = nablaPsi[k, :, l]
term_3 = (1/(2*t)) * np.outer(difference, difference)
term_4 = nabla2Psi[k, :, :, l]
return np.dot(term_1, term_2) #+ nb_einsum(term_3, term_4)
def rejection_sampler(p, xbounds, pmax):
while True:
x = np.random.rand(1)*(xbounds[1]-xbounds[0])+xbounds[0]
y = np.random.rand(1)*pmax
if y<=p(x[0]):
return x
def sample_cartpole_action(pi, x):
f1 = pi(0.2, x)
f2 = pi(0.7, x)
y = np.random.rand()
if y <= 0.5*f1 and y >= 0:
return y / f1
return (y - 0.5 * f1 + 0.5 * f2) / f2
class GeneratorModel:
def __init__(self, psi, reward, action_bounds, L=None):
"""
Create instance of model
Parameters:
psi: Set of dictionary functions from the observables class
reward: The reward function for whatever problem you are trying to model
"""
self.psi = psi
self.reward = reward
self.action_bounds = action_bounds
self.L = L
def fit(self, X, U, timesteps=2, lamb=10):
"""
Fits a policy pi to the dataset using Koopman RL
Parameters:
X: State data
U: Action data
"""
self.X = X
self.U = U
# self.min_action = np.min(U)
# self.max_action = np.max(U)
self.X_tilde = np.append(X, [U], axis=0) # extended states
self.d = self.X_tilde.shape[0]
self.m = self.X_tilde.shape[1]
# self.s = int(self.d*(self.d+1)/2) # number of second order poly terms
self.Psi_X_tilde = self.psi(self.X_tilde)
# self.Psi_X_tilde_T = Psi_X_tilde.T
self.k = self.Psi_X_tilde.shape[0]
self.nablaPsi = self.psi.diff(self.X_tilde)
self.nabla2Psi = self.psi.ddiff(self.X_tilde)
self.dPsi_X_tilde = np.zeros((self.k, self.m))
for row in range(self.k):
for column in range(self.m-1):
self.dPsi_X_tilde[row, column] = dpsi(
self.X_tilde, self.nablaPsi,
self.nabla2Psi, row, column
)
# self.dPsi_X_tilde_T = dPsi_X_tilde.T
# L = rrr(Psi_X_tilde_T, dPsi_X_tilde_T)
self.L = estimate_L.rrr(self.Psi_X_tilde.T, self.dPsi_X_tilde.T)
# self.L = estimate_L.rrr(self.Psi_X_tilde, self.dPsi_X_tilde)
self.z_m = np.zeros((self.k, self.k))
self.phi_m_inverse = np.linalg.inv(np.identity(self.k))
self.V, self.pi = learningAlgorithm(
self.L, self.X, self.psi, self.Psi_X_tilde,
self.action_bounds, self.reward,
timesteps=timesteps, lamb=lamb
)
def update_policy(self, timesteps=2, lamb=10):
self.V, self.pi = learningAlgorithm(
self.L, self.X, self.psi, self.Psi_X_tilde,
self.action_bounds, self.reward,
timesteps=timesteps, lamb=lamb
)
def update_model(self, x, u, update_policy=False):
"""
Updates the model to include data about a new point (this assumes only two states/actions were given during the fitting process)
Parameters:
x: A single state vector
u: A single action vector
"""
x = x.reshape(-1,1)
self.X = np.append(self.X, x, axis=1)
x_tilde = np.append(x, u).reshape(-1,1)
self.X_tilde = np.append(self.X_tilde, x_tilde, axis=1)
self.Psi_X_tilde = np.append(self.Psi_X_tilde, self.psi(x_tilde), axis=1)
k = self.dPsi_X_tilde.shape[0]
for l in range(k):
self.dPsi_X_tilde[l, -1] = dpsi(
self.X_tilde, self.nablaPsi, self.nabla2Psi, l, -2
)
self.dPsi_X_tilde = np.append(self.dPsi_X_tilde, np.zeros((k,1)), axis=1)
Psi_X_tilde_m = self.Psi_X_tilde[:,-1].reshape(-1,1)
self.z_m, self.phi_m_inverse, self.L = rgEDMD(
self.dPsi_X_tilde,
Psi_X_tilde_m,
self.z_m,
self.phi_m_inverse
)
if update_policy: self.update_policy()
def sample_action(self, x):
"""
Sample action from policy pi
Parameters:
x: Current state vector
Returns:
action: Action sampled from estimated optimal policy pi
"""
return sample_cartpole_action(self.pi, x)
# Attempt at taylor approximation sampling
# pi_hat_star_hat = interpolate.approximate_taylor_polynomial(self.pi, np.mean(U), 2, 1)
# rand = np.random.normal(loc=, scale=)
# sampled_action = pi_hat_star_hat(rand)
# try:
# return rejection_sampler(lambda u: self.pi(u, 100), [self.min_action,self.max_action], 1.1)[0]
# except:
# self.V, self.pi = learningAlgorithm(
# self.L, self.X, self.Psi_X_tilde,
# self.U, self.reward, timesteps=2, lamb=0.5
# )
# return rejection_sampler(lambda u: self.pi(u, 100), [self.min_action,self.max_action], 1.1)[0]
# %%