-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathgenerator.py
247 lines (213 loc) · 6.9 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
#%%
import numpy as np
import scipy as sp
import pandas as pd
'''======================= HELPER FUNCTIONS ======================='''
# Construct B matrix as seen in 3.1.2 of the reference paper
def constructB(d, k):
Bt = np.zeros((d, k))
if k == 1:
Bt[0,0] = 1
else:
num = np.arange(d)
Bt[num, num+1] = 1
B = Bt.T
return B
# Construct similar B matrix as above, but for second order monomials
def constructSecondOrderB(s, k):
Bt = np.zeros((s, k))
if k == 1:
Bt[0,0] = 1
else:
row = 0
for i in range(d+1, d+1+s):
Bt[row,i] = 1
row += 1
B = Bt.T
return B
'''======================= CONSTRUCT DATA MATRIX ======================='''
from brownian import brownian
# The Wiener process parameter.
sigma = 1
# Total time.
T = 10000
# Number of steps.
N = 10000
# Time step size
dt = T/N
# Number of realizations to generate.
n = 20
# Create an empty array to store the realizations.
X = np.empty((n, N+1))
# Initial values of x.
X[:, 0] = 50
brownian(X[:, 0], N, dt, sigma, out=X[:, 1:])
Z = np.roll(X,-1)[:, :-1]
X = X[:, :-1]
#%%
'''======================= SETUP/DEFINITIONS ======================='''
import observables
from sympy import symbols
from sympy.polys.monomials import itermonomials, monomial_count
from sympy.polys.orderings import monomial_key
d = X.shape[0]
m = X.shape[1]
s = int(d*(d+1)/2) # number of second order poly terms
rtoler=1e-02
atoler=1e-02
psi = observables.monomials(2)
#%%
x_str = ""
for i in range(d):
x_str += 'x_' + str(i) + ', '
x_syms = symbols(x_str)
M = itermonomials(x_syms, 2)
sortedM = sorted(M, key=monomial_key('grlex', np.flip(x_syms)))
# print(sortedM)
#%%
Psi_X = psi(X)
Psi_X_T = Psi_X.T
nablaPsi = psi.diff(X)
nabla2Psi = psi.ddiff(X)
print("nablaPsi Shape", nablaPsi.shape)
k = Psi_X.shape[0]
#%%
'''======================= COMPUTATIONS ======================='''
# This computes dpsi_k(x) exactly as in the paper
# t = 1 is a placeholder time step, not really sure what it should be
def dpsi(k, l, t=1):
difference = (X[:, l+1] - X[:, l])
term_1 = (1/t) * (difference)
term_2 = nablaPsi[k, :, l]
term_3 = (1/(2*t)) * (difference.reshape(-1, 1) @ difference.reshape(1, -1))
term_4 = nabla2Psi[k, :, :, l]
return np.dot(term_1, term_2) + np.tensordot(term_3, term_4)
vectorized_dpsi = np.vectorize(dpsi)
# Construct \text{d}\Psi_X matrix
dPsi_X = np.empty((k, m))
for column in range(m-1):
dPsi_X[:, column] = vectorized_dpsi(range(k), column)
#%%
# Calculate Koopman generator approximation
train = int(m * 0.8)
test = m - train
M = dPsi_X[:, :train] @ np.linalg.pinv(Psi_X[:, :train]) # \widehat{L}^\top
L = M.T # estimate of Koopman generator
def check_symmetric(a, rtol=1e-02, atol=1e-02):
return np.allclose(a, a.T, rtol=rtol, atol=atol)
print("Is L approximately symmetric:", check_symmetric(L))
# Eigen decomposition
eig_vals, eig_vecs = sp.sparse.linalg.eigs(L) if sp.sparse.issparse(L) else sp.linalg.eig(L)
# Compute eigenfunction matrix
eig_funcs = (eig_vecs).T @ Psi_X
#%% Construct estimates of drift vector (b) and diffusion matrix (a) using two methods:
# 1. Directly from dictionary functions without dimension reduction
# 2. Construct eigendecomposition and restrict its order
# Construct B matrix that selects first-order monomials (except 1) when multiplied by list of dictionary functions
B = constructB(d, k)
# Construct second order B matrix (selects second-order monomials)
second_orderB = constructSecondOrderB(s, k)
# Computed b function (sometimes denoted by \mu) without dimension reduction
L_times_B_transposed = (L @ B).T
def b(l):
return L_times_B_transposed @ Psi_X[:, l] # (k,)
# Calculate Koopman modes
V_v1 = B.T @ np.linalg.inv((eig_vecs).T)
print("V_v1 shape:", V_v1.shape)
# The b_v2 function allows for heavy dimension reduction
# default is reducing by 90% (taking the first k/10 eigen-parts)
# TODO: Figure out correct place to take reals
def b_v2(l, num_dims=k//10, V=V_v1):
res = 0
for ell in range(k-1, k-num_dims, -1):
res += eig_vals[ell] * eig_funcs[ell, l] * V[:, ell] #.reshape(-1, 1)
return np.real(res)
#%%
# for l in range(m):
# b_l = b(l)
# print("b(l):", b_l)
# b_v2_l = b_v2(l)
# print("b_v2(l):", b_v2_l)
#%%
# the following a functions compute the diffusion matrices as flattened vectors
# this was calculated in a weird way, so it could have issues...
L_times_second_orderB_transpose = (L @ second_orderB).T
def a(l):
return (L_times_second_orderB_transpose @ Psi_X[:, l]) - \
(second_orderB.T @ nablaPsi[:, :, l] @ b(l))
V_v2 = second_orderB.T @ np.linalg.inv((eig_vecs).T)
def a_v2(l):
return (b_v2(l, V=V_v2)) - \
(second_orderB.T @ nablaPsi[:, :, l] @ b_v2(l))
#%% Reshape a vector as matrix and perform some tests
def covarianceMatrix(a_func, l):
a_l = a_func(l)
covariance = np.zeros((d, d))
row = 0
col = 0
covariance[row, col] = a_l[0]
col += 1
n = 1
while col < d:
covariance[row, col] = a_l[n]
covariance[col, row] = a_l[n]
if row == col:
col += 1
row = 0
else:
row += 1
n +=1
return covariance
test = covarianceMatrix(a, 2)
test_v2 = covarianceMatrix(a_v2, 2)
test_df = pd.DataFrame(test)
test_v2_df = pd.DataFrame(test_v2)
print("a_v1:", test_df)
print("a_v2:", test_v2_df)
print("a_v1 diagonal:", np.diagonal(test))
print("a_v2 diagonal:", np.diagonal(test_v2))
# print(test.shape)
# print(check_symmetric(test, 0, 0))
# for j in range(m):
# result = np.count_nonzero(covarianceMatrix(j))
# if result < d*d: print(result)
# diagAMat = np.zeros((d, d))
# for j in range(m):
# evaldA = covarianceMatrix(j)
# for i in range(d):
# diagAMat[i, i] = evaldA[i, i]
# result = np.allclose(diagAMat, evaldA, rtol=rtoler, atol=atoler)
# if result: print("All close at j =", j)
# Oh no... it's not positive definite
# Some calculation must be wrong
# decomp = sp.linalg.cholesky(covarianceMatrix(0))
#%%
# A = Psi_X
# B is Joe.
B = np.zeros((d, m))
m_range = np.arange(m)
B = X[:, m_range] - Z[:, m_range]
print("B shape:", B.shape)
print("Psi_X transpose shape:", Psi_X_T.shape)
PsiMult = sp.linalg.inv(Psi_X @ Psi_X_T) @ Psi_X
C = PsiMult @ B.T
# Each col of matric C represents the coeficients in a linear combo of the dictionary functions that makes up each component of the drift vector. So each c_{}
print("C shape:", C.shape)
b_v3 = C.T @ Psi_X
for l in range(5):
print(b_v3[:, l])
def a_v3(l):
diffusionDictCoefs = np.empty((d, d, k))
diffusionMat = np.empty((d, d))
for i in range(d):
for j in range(d):
Bij = (1/dt)*B[i]*B[j]
diffusionDictCoefs[i, j] = PsiMult @ Bij
diffusionMat[i, j] = np.dot(diffusionDictCoefs[i, j], Psi_X[:,l])
return diffusionMat
print("a_v3:")
for l in range(5):
calc = a_v3(l)
print(f"Matrix {l}:", calc)
print(f"Diagonal {l}:", np.diagonal(calc))
# %%