-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprocess.py
402 lines (339 loc) · 16.5 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
import time, os, torch, json
import stats, cluster, visualization, utils, correlation
import numpy as np
import pandas as pd
import torch.nn as nn
import torch.optim as optim
from typing import Literal, List
from warnings import warn
from torch.utils.data import DataLoader
from torch.utils.data import DataLoader
from data import TimeSeriesCorrelationsDataset
from loss import KLDivergenceLoss
class Processor():
"""The processor for model training and inference."""
def __init__(
self,
model: nn.Module,
criterion: nn.Module,
data_loader: DataLoader,
learning_rate: float,
save_folder: str,
optimizer_type: Literal['adam', 'sgd'] | None = 'adam',
):
"""
Initialize the model.
Args:
model (Module): The model to train and inference.
criterion (Module): The loss object.
data_loader (DataLoader): the data loader containing the training data and the data for inference.
learning_rate (float): The learning rate for model training.
save_folder (str): Folder to save the results and model.
optimizer_type (str): Type of the optimizer. Default to `'adam'`.
"""
# set the model to cuda if not
if not next(model.parameters()).is_cuda:
self.model = model.to('cuda')
else:
self.model = model
self.criterion = criterion
self.data_loader = data_loader
self.save_folder = save_folder
self.learning_rate = learning_rate
self.optimizer_type = optimizer_type
if self.optimizer_type == 'adam':
self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)
elif self.optimizer_type == 'sgd':
self.optimizer = optim.SGD(self.model.parameters(), lr=self.learning_rate)
else:
raise ValueError(f'Unsuported optimizer_type : {optimizer_type}')
self.losses = []
self.epochs = 0
def train(
self,
epochs: int,
save_model: bool | None = True,
save_period : int | None = None,
save_log: bool | None = True,
siamese: bool | None = False
):
"""
Train ther model. If the model has been trained, the model will be trained based on the previous trained parameters.
Args:
epochs (int): Number of epochs for this training process.
save_model (bool): If `True`, save the model with name self.directory + '.pth' in the folder self.parent_folder / self.directory. Default to `True`.
save_period (int | None): If not `None`, save the model every save_period of epochs. Default to `None`.
save_log (bool): If `True`, save 'batch_size', 'num_epochs', 'learning_rate' and 'losses' in a json file.
named self.directory + '.json' in the folder self.parent_folder / self.directory. Default to `Trur`.
siamese (bool): If `True`, the 'targets' from `self.data_loader` will be fed to the model. Default to `False`.
Returns:
out (list): List of the losses including those from the training process before.
"""
self.model.train()
for _ in range(epochs):
t1 = time.time()
batch_losses = []
for inputs, targets in self.data_loader:
inputs = inputs.to('cuda')
targets = targets.to('cuda')
self.optimizer.zero_grad()
outputs = self.model(inputs)
if siamese:
targets = self.model(targets)
if isinstance(self.criterion, KLDivergenceLoss):
loss = self.criterion(outputs, targets, self.model.parameters())
else:
loss = self.criterion(outputs, targets)
batch_losses.append(loss.item())
loss.backward()
self.optimizer.step()
t2 = time.time()
epoch_loss = np.mean(batch_losses)
print(f"Epoch {self.epochs+1}, Loss : {epoch_loss}, Time : {t2-t1}")
self.epochs += 1
self.losses.append(epoch_loss)
if save_period is not None and self.epochs % save_period == 0:
torch.save(self.model, os.path.join(self.save_folder, f'model - {self.epochs}.pth'))
if save_model:
torch.save(self.model.state_dict(), os.path.join(self.save_folder, f'model - {self.epochs}.pth'))
if save_log:
self.save_log()
return self.losses
def save_log(self):
"""
Save training log in a json file named self.save_folder/training_log.json.
"""
loss_config = None
if isinstance(self.criterion, nn.MSELoss):
loss_type = 'mse'
elif isinstance(self.criterion, nn.KLDivLoss):
loss_type = 'kldiv'
elif isinstance(self.criterion, KLDivergenceLoss):
loss_type = 'kldiv'
loss_config = {
'lambda_weights' : self.criterion.l2_reg_weight,
'lambda_entropy' : self.criterion.entropy_reg_weight
}
else:
loss_type = None
warn('This loss type cannot be interpreted to a string')
training_info = {
'batch_size' : self.data_loader.batch_size,
'num_epochs' : self.epochs,
'learning_rate' : self.learning_rate,
'losses' : self.losses,
'optimizer_type' : self.optimizer_type,
'loss_type': loss_type,
'loss_config': loss_config
}
with open(os.path.join(self.save_folder, 'training_log.json'), 'w') as f:
json.dump(training_info, f, indent=4)
def extract_feature_representation(self, save: bool | None = True):
"""
Extract the features from the encoder. Only for the CNN AutoEncoder.
Args:
save (bool): If `True`, the array of features will be saved with path self.save_folder/features.npy, and dates will be saved
with path self.save_folder/dates.csv if the passed dataloader's `date` attribute is not `None`. Default to `True`.
Returns:
features (ndarray): The extracted features.
dates (DataFrame): DataFrame of the corresponding dates, returned if the passed dataloader's `date` attribute is not `None`.
"""
encoder = self.model.encoder
encoder.eval()
features = np.concatenate(
[encoder(inputs.to('cuda')).to('cpu').detach().numpy()
for inputs, _ in self.data_loader],
0
)
if save:
np.save(os.path.join(self.save_folder, 'features'), features)
if self.data_loader.dataset.dates is not None:
self.data_loader.dataset.dates.to_csv(os.path.join(self.save_folder, 'dates.csv'))
if self.data_loader.dataset.dates is not None:
return features, self.data_loader.dataset.dates
else:
return features
def extract_regimes(self, save: bool | None = False) -> pd.DataFrame:
"""
Extract regimes. Only for the Siamese CNN.
Args:
save (bool): If `True`, the extracted regimes will be saved to self.save_folder/regimes.csv.
Returns:
regimes_df (DataFrame). The dataframe of the extracted regimes.
"""
self.model.eval()
regimes = np.concatenate(
[self.model(inputs.to('cuda')).to('cpu').detach().numpy()
for inputs, _ in self.data_loader],
0
)
if self.data_loader.dataset.dates is None:
raise RuntimeError('dataset need to have not None attribute dates')
regimes_df = self.data_loader.dataset.dates.copy()
regimes_df['regime'] = regimes
if save:
regimes_df.to_csv(os.path.join(self.save_folder, 'regimes.csv'))
return regimes_df
class SimGenerator():
"""The class to generate similarity matrix."""
def __init__(self, dataset: TimeSeriesCorrelationsDataset, method: Literal['meta','cophenetic'], save_folder: str):
"""
Initialize the class.
Args:
dataset (TimeSeriesCorrelationsDataset): Dataset of the correlation matrices.
method (str): The method to generate the similarity matrix.
save_folder (int): Folder to save the similarity matrix.
"""
self.dataset = dataset
if method not in ['meta','cophenetic']:
raise ValueError(f"Unsupported method {method}. This should be meta or cophenetic.")
self.method = method
self.save_folder = save_folder
def generate_features(self, save: bool | None = True):
"""
Generate the the similarity matrix, i.e., the features.
Args:
save (bool): If `True`, the similarity matrix will be saved to self.save_folder/dates.csv. Default to `True`.
Returns:
sim (ndarray): The generated similarity matrix.
"""
if self.method == 'meta':
sim = correlation.generate_meta_similarity(self.dataset.data)
else:
sim = correlation.generate_cophenetic_similarity(self.dataset.data)
if save:
np.save(os.path.join(self.save_folder, 'features'), sim)
self.dataset.dates.to_csv(os.path.join(self.save_folder, 'dates.csv'))
return sim
class Analyser:
"""The class to perform the analysis on the identified regimes."""
def __init__(self, returns_df: pd.DataFrame, returns_dfs: List[pd.DataFrame]):
"""
Initialize the class.
Args:
returns_df (DataFrame): The DataFrame with columns (`'DATE'`, ...) (the other columns are daily returns with the index name as the first row).
returns_dfs (DataFrame): List of Dataframes, where each dataframe contains columns (`'DATE'`, ...) where other columns are trailing or forward returns.
"""
self.returns_df = returns_df
self.returns_dfs = returns_dfs
def analyse(self, features: np.ndarray | None, original_regimes_df: pd.DataFrame, folder: str, return_filled_regimes: bool | None = True):
"""
Perform the analysis.
Args:
original_regimes_df (DataFrame): The original regimes, i.e., those with sliding step. This has columns (`'DATEA'`, `'regimes'`).
folder (str): Folder to save the analysis results.
return_filled_regimes (bool): If `True`, the filled regimes will be returned.
"""
# check if only one regime
if len(original_regimes_df['regime'].unique()) == 1:
print('no need to analyze since only one regime')
if return_filled_regimes:
regimes_df = self.returns_df[['DATE']].copy()
regimes_df['regime'] = 0
return regimes_df
return
returns_regimes_df = utils.fill_dates_values(
df = self.returns_df,
regimes_df=original_regimes_df
)
returns_regimes_df = returns_regimes_df.sort_values(by='DATE')
# durations
regimes_durations_stats = stats.analyse_regimes_durations(returns_regimes_df)
transition_matix_regime_level = stats.calculate_transition_matrix(returns_regimes_df, at_regimes_level=True)
transition_matix_date_level = stats.calculate_transition_matrix(returns_regimes_df, at_regimes_level=False)
transition_matix_regime_level_entropies = {
'unnorm' : stats.calculate_transition_matrix_entropy(transition_matix_regime_level),
'norm' : stats.calculate_transition_matrix_entropy(transition_matix_regime_level, apply_norm=True)
}
transition_matix_date_level_entropies = {
'unnorm' : stats.calculate_transition_matrix_entropy(transition_matix_date_level),
'norm' : stats.calculate_transition_matrix_entropy(transition_matix_date_level, apply_norm=True)
}
# returns
returns_stats_within_regimes = stats.calculate_return_metrics_within_regime(returns_regimes_df)
returns_stats_last_dates = stats.calculate_return_metrics_last_date_of_period(returns_regimes_df)
returns_forward_10days = stats.calculate_returns_forward(returns_regimes_df, 10)
# cluster analysis
if features is None:
cluster_ana_results = pd.DataFrame()
else:
cluster_ana_results = cluster.assess_clustering_results(features, original_regimes_df)
cluster_ana_on_returns = cluster.assess_clustering_on_returns(dfs=self.returns_dfs, regimes=returns_regimes_df)
results = {
'durations_ana' : {
'stats' : regimes_durations_stats.to_dict(),
'transition_matrix' : {
'regime_level' : {
'matrix' : transition_matix_regime_level.to_dict(),
'entropies' : transition_matix_regime_level_entropies
},
'data_level' : {
'matrix' : transition_matix_date_level.to_dict(),
'entropies': transition_matix_date_level_entropies
}
}
},
'returns_ana' : {
'within_regimes' : returns_stats_within_regimes.to_dict(),
'last_dates' : returns_stats_last_dates.to_dict(),
'forward_10days' : returns_forward_10days.to_dict()
},
'cluster_ana' : cluster_ana_results.to_dict(),
'cluster_ana_on_returns': cluster_ana_on_returns.to_dict()
}
if folder is not None:
with open(os.path.join(folder, 'analysis.json'), mode='w') as f:
json.dump(results, f, cls=utils.NumpyEncoder, indent=8)
with open(os.path.join(folder, 'analysis.csv'), 'w', newline='\n') as f:
f.write('regimes durations stats' + '\n')
regimes_durations_stats.to_csv(f, index=False)
f.write('transition matix at regime level' + '\n')
transition_matix_regime_level.to_csv(f)
pd.Series(transition_matix_regime_level_entropies, name='entropy').to_csv(f)
f.write('transition matix at date level' + '\n')
transition_matix_date_level.to_csv(f)
pd.Series(transition_matix_date_level_entropies, name='entropy').to_csv(f)
f.write('returns stats within each regime for each index' + '\n')
returns_stats_within_regimes.to_csv(f, index=False)
f.write('returns stats for the last date of the period for each regime for each index' + '\n')
returns_stats_last_dates.to_csv(f, index=False)
f.write('returns stats for 10 days returns forward' + '\n')
returns_forward_10days.to_csv(f, index=False)
f.write('cluster assessment' + '\n')
cluster_ana_results.to_csv(f)
f.write('cluster assessment on returns' + '\n')
cluster_ana_on_returns.to_csv(f, index=False)
visualization.visualize_regime_durations(
regimes_durations_stats,
filename=os.path.join(folder, 'durations.png'),
show=False,
backend='Agg'
)
visualization.visualize_regimes(
returns_regimes_df,
filename=os.path.join(folder, 'regimes.png'),
show=False,
backend='Agg'
)
visualization.visualize_transition_matrix(
transition_matix_regime_level,
filename=os.path.join(folder, 'transition matrix ' + 'regime level.png'),
show=False,
backend='Agg'
)
visualization.visualize_transition_matrix(
transition_matix_date_level,
filename=os.path.join(folder, 'transition matrix ' + 'date level.png'),
show=False,
backend='Agg'
)
visualization.visualize_cluster_assess_on_returns(
cluster_ana_on_returns[cluster_ana_on_returns['metric'] == 'silhouette'].drop(columns='metric'),
fig_size=(12,4),
fig_title='evaluation of cluster on returns',
show_fig=False,
filename=os.path.join(folder, 'cluster_on_returns.png'),
backend='Agg'
)
if return_filled_regimes:
return returns_regimes_df[['DATE','regime']]