-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathnon_contiguous_parameter_identification_wang_opty.py
348 lines (295 loc) · 11.5 KB
/
non_contiguous_parameter_identification_wang_opty.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
# %%
"""
Parameter Identification from Noncontiguous Measurements with Bias and Noise
============================================================================
In parameter estimation it is common to collect measurements of a system's
trajectories from distinct experiments. For example, if you are identifying the
parameters of a mass-spring-damper system, you may excite the system with
different initial conditions multiple times and measure the position of the
mass. The data cannot simply be stacked end-to-end in time and the
identification run because the measurement data would be discontinuous between
each measurement trial.
A workaround in opty is to create a set of differential equations with unique
state variables for each measurement trial that all share the same constant
parameters. You can then identify the parameters from all measurement trials
simultaneously by passing the uncoupled differential equations to opty.
This idea is due to Jason Moore.
In addition, for each measurement, one may create a set of differential
equations with unique state variables that again share the same constant
parameters. In addition, one adds noise to these equations. (Intuitively this
noise corresponds to a force acting on the system).
This idea is due to Huawei Wang and A. J. van den Bogert.
So, if one has *number_of_measurements* measurements, and one creates
*number_of_repeats* differential equations for each measurement, one will have
*number_of_measurements* * *number_of_repeats* uncoupled differential equations,
all sharing the same constant parameters.
Mass-spring-damper-friction Example
===================================
The position of a simple system consisting of a mass connected to a fixed point
by a spring and a damper and subject to friction is simulated and recorded as
noisy measurements. The spring constant, the damping coefficient
and the coefficient of friction will be identified.
**State Variables**
- :math:`x_{i, j}`: position of the mass of the i-th measurement trial [m]
- :math:`u_{i, j}`: speed of the mass of the i-th measurement trial [m/s]
**Parameters**
- :math:`m`: mass [kg]
- :math:`c`: linear damping coefficient [Ns/m]
- :math:`k`: linear spring constant [N/m]
- :math:`l_0`: natural length of the spring [m]
- :math:`friction`: friction coefficient [N]
- :math:`n_{i, j}`: noise added [N]
"""
import sympy as sm
import sympy.physics.mechanics as me
import numpy as np
from scipy.integrate import solve_ivp
from opty import Problem
from opty.utils import parse_free
import matplotlib.pyplot as plt
# %%
# Equations of Motion.
# --------------------
#
# Basic data may be set here
number_of_measurements = 4
number_of_repeats = 3
t0, tf = 0.0, 10.0
num_nodes = 500
noise_scale = 1.0
#
m, c, k, l0, friction = sm.symbols('m, c, k, l0, friction')
t = me.dynamicsymbols._t
x = sm.Matrix([me.dynamicsymbols([f'x{i}_{j}' for j in range(number_of_repeats)])
for i in range(number_of_measurements)])
u = sm.Matrix([me.dynamicsymbols([f'u{i}_{j}' for j in range(number_of_repeats)])
for i in range(number_of_measurements)])
n = sm.Matrix([me.dynamicsymbols([f'n{i}_{j}' for j in range(number_of_repeats)])
for i in range(number_of_measurements)])
xh, uh = me.dynamicsymbols('xh, uh')
# %%
# Form the equations of motion, I use Kane's method here.
#
def kd_eom_rh(xh, uh, m, c, k, l0, friction):
"""" sets up the eoms for the system"""
N = me.ReferenceFrame('N')
O, P = sm.symbols('O, P', cls=me.Point)
O.set_vel(N, 0)
P.set_pos(O, xh*N.x)
P.set_vel(N, uh*N.x)
bodies = [me.Particle('part', P, m)]
# :math:`\tanh(\alpha \cdot x) \approx sign(x)` for large :math:`\alpha`, and
# is differentiale everywhere.
forces = [(P, -c*uh*N.x - k*(xh-l0)*N.x - friction * sm.tanh(60*uh)*N.x)]
kd = sm.Matrix([uh - xh.diff(t)])
KM = me.KanesMethod(N,
q_ind=[xh],
u_ind=[uh],
kd_eqs=kd
)
fr, frstar = KM.kanes_equations(bodies, forces)
rhs = KM.rhs()
return (kd, fr + frstar, rhs)
kd, fr_frstar, rhs = kd_eom_rh(xh, uh, m, c, k, l0, friction)
# %%
# Stack the equations such that the first number_of_repeats equations
# belong to the first measurement, the second number_of_repeats equations belong
# to the second measurement, and so on.
#
kd_total = sm.Matrix([])
eom_total = sm.Matrix([])
rhs_total = sm.Matrix([])
for i in range(number_of_measurements):
for j in range(number_of_repeats):
eom_h = fr_frstar.subs({xh: x[i, j], uh: u[i, j],
uh.diff(t): u[i, j].diff(t), xh.diff(t): u[i, j]}) + \
sm.Matrix([n[i, j]])
kd_h = kd.subs({xh: x[i, j], uh: u[i, j]})
rhs_h = sm.Matrix([rhs[1].subs({xh: x[i, j], uh: u[i, j]})])
kd_total = kd_total.col_join(kd_h)
eom_total = eom_total.col_join(eom_h)
rhs_total = rhs_total.col_join(rhs_h)
eom = kd_total.col_join(eom_total)
uh =sm.Matrix([u[i, j] for i in range(number_of_measurements)
for j in range(number_of_repeats)])
rhs = uh.col_join(rhs_total)
for i in range(number_of_repeats):
sm.pprint(eom[i])
for i in range(3):
print('.........')
for i in range(number_of_repeats):
sm.pprint(eom[-(number_of_repeats-i)])
# %%
# Generate Noisy Measurement
# --------------------------
# Create 'number_of_measurements' sets of measurements with different initial
# conditions. To get the measurements, the equations of motion are integrated,
# and then noise is added to each point in time of the solution.
#
states = [x[i, j] for i in range(number_of_measurements)
for j in range(number_of_repeats)] + \
[u[i, j] for i in range(number_of_measurements)
for j in range(number_of_repeats)]
parameters = [m, c, k, l0, friction]
par_vals = [1.0, 0.25, 2.0, 1.0, 0.5]
eval_rhs = sm.lambdify(states + parameters, rhs)
times = np.linspace(t0, tf, num=num_nodes)
measurements = []
# %%
# Integrate the differential equations. If np.random.seed(seed) is used, the
# seed must be changed for every measurement to ensure they are independent.
#
for i in range(number_of_measurements):
seed = 1234*(i+1)
np.random.seed(seed)
x0 = 3.0*np.random.randn(2*number_of_measurements * number_of_repeats)
sol = solve_ivp(lambda t, x, p: eval_rhs(*x, *p).squeeze(),
(t0, tf), x0, t_eval=times, args=(par_vals,))
seed = 10000 + 12345*(i+1)
np.random.seed(seed)
measurements.append(sol.y[0, :] + 1.0*np.random.randn(num_nodes))
measurements = np.array(measurements)
print('shape of measurement array', measurements.shape)
print('shape of solution array ', sol.y.shape)
# %%
# Setup the Identification Problem
# --------------------------------
#
# The goal is to identify the damping coefficient :math:`c`, the spring
# constant :math:`k` and the coefficient of friction :math:`friction`.
# The objective :math:`J` is to minimize the least square
# difference in the optimal simulation as compared to the measurements. If
# some measurement is considered more reliable, its weight :math:`w` may be
# increased relative to the other measurements.
#
#
# objective = :math:`\int_{t_0}^{t_f} \sum_{i=1}^{\text{number_of_measurements}} \left[ \sum_{s=1}^{\text{number_of_repeats}} (w_i (x_{i, s} - x_{i,s}^m)^2 \right] \hspace{2pt} dt`
#
interval_value = (tf - t0) / (num_nodes - 1)
w =[1 for _ in range(number_of_measurements)]
nm = number_of_measurements
nr = number_of_repeats
def obj(free):
sum = 0.0
for i in range(nm):
for j in range(nr):
sum += np.sum((w[i]*(free[(i*nr+j)*num_nodes:(i*nr+j+1)*num_nodes] -
measurements[i])**2))
return sum
def obj_grad(free):
grad = np.zeros_like(free)
for i in range(nm):
for j in range(nr):
grad[(i*nr+j)*num_nodes:(i*nr+j+1)*num_nodes] = 2*w[i]*interval_value*(
free[(i*nr+j)*num_nodes:(i*nr+j+1)*num_nodes] - measurements[i]
)
return grad
# %%
# By not including :math:`c`, :math:`k`, :math:`friction`
# in the parameter map, they will be treated as unknown parameters.
#
par_map = {m: par_vals[0], l0: par_vals[3]}
bounds = {
c: (0.01, 2.0),
k: (0.1, 10.0),
friction: (0.1, 10.0),
}
# %%
# Set up the known trajectory map. If np.random.seed(seed) is used, the
# seed must be changed for every map to ensure they are idependent.
# noise_scale gives the 'strength' of the noise.
#
known_trajectory_map = {}
for i in range(number_of_measurements):
for j in range(number_of_repeats):
seed = 10000 + 12345*(i+1)*(j+1)
np.random.seed(seed)
known_trajectory_map = (known_trajectory_map |
{n[i, j]: noise_scale*np.random.randn(num_nodes)})
# %%
# Set up the problem.
#
problem = Problem(
obj,
obj_grad,
eom,
states,
num_nodes,
interval_value,
known_parameter_map=par_map,
time_symbol=me.dynamicsymbols._t,
integration_method='midpoint',
bounds=bounds,
known_trajectory_map=known_trajectory_map
)
# %%
# This give the sequence of the unknown parameters at the tail of solution.
print(problem.collocator.unknown_parameters)
# %%
# Create an Initial Guess
# -----------------------
#
# It is reasonable to use the measurements as initial guess for the states
# because they would be available. Here, only the measurements of the position
# are used and the speeds are set to zero. The last values are the guesses
# for :math:`c`, :math:`friction` and :math:`k`, respectively.
#
initial_guess = np.array([])
for i in range(number_of_measurements):
for j in range(number_of_repeats):
initial_guess = np.concatenate((initial_guess, measurements[i, :]))
initial_guess = np.hstack((initial_guess,
np.zeros(number_of_measurements*number_of_repeats*num_nodes),
[0.1, 3.0, 2.0]))
# %%
# Solve the Optimization Problem
# ------------------------------
#
solution, info = problem.solve(initial_guess)
print(info['status_msg'])
print(f'final value of the objective function is {info['obj_val']:.2f}' )
# %%
problem.plot_objective_value()
# %%
problem.plot_constraint_violations(solution)
# %%
# The identified parameters are:
# ------------------------------
#
print(f'Estimate of damping coefficient is {solution[-3]: 1.2f}' +
f' Percentage error is {(solution[-3]-par_vals[1])/solution[-3]*100:1.2f} %')
print(f'Estimate of the spring constant is {solution[-1]: 1.2f}' +
f' Percentage error is {(solution[-1]-par_vals[2])/solution[-1]*100:1.2f} %')
print(f'Estimate of the friction coefficient is {solution[-2]: 1.2f}'
f' Percentage error is {(solution[-2]-par_vals[-1])/solution[-2]*100:1.2f} %')
# %%
# Plot the measurements and the trajectories calculated
#------------------------------------------------------
#
sol_parsed, _, _ = parse_free(solution, len(states), 0, num_nodes )
if number_of_measurements > 1:
fig, ax = plt. subplots(number_of_measurements, 1,
figsize=(6.4, 1.25*number_of_measurements), sharex=True, constrained_layout=True)
for i in range(number_of_measurements):
ax[i].plot(times, sol_parsed[i*number_of_repeats, :], color='red')
ax[i].plot(times, measurements[i], color='blue', lw=0.5)
ax[i].set_ylabel(f'{states[number_of_repeats* i]}')
ax[-1].set_xlabel('Time [s]')
ax[0].set_title('Trajectories')
else:
fig, ax = plt. subplots(1, 1, figsize=(6.4, 1.25))
ax.plot(times, sol_parsed[0, :], color='red')
ax.plot(times, measurements[0], color='blue', lw=0.5)
ax.set_ylabel(f'{states[0]}')
ax.set_xlabel('Time [s]')
ax.set_title('Trajectories')
prevent_output = True
# %%
#Plot the Trajectories
#
problem.plot_trajectories(solution)
# %%
#
# sphinx_gallery_thumbnail_number = 3
#
plt.show()