-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdata.py
186 lines (151 loc) · 7.45 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
import torch
import numpy as np
import pandas as pd
import correlation
from typing import Literal, List
from pyts.image import GramianAngularField, MarkovTransitionField
from torch.utils.data import Dataset
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
def timeseries_to_image(
timeseries: pd.DataFrame,
window: int,
slide_step: int,
apply_norm: bool | None = True,
method: Literal['gasf', 'gadf', 'mtf'] | None = 'gasf'
):
"""
Transform multivariate timeseries into images. Within each window, an image will be formed by stacking the images for each time series transformed by `method`.
Args:
timeseries (DataFrame): DataFrame only with columns (`'DATE'`, index1, index2, ....) where indexn is the name of the index.
window (int): Length of the sliding widow, i.e., width of the tranformed images.
slide_step (int): Length of the sliding step .
apply_norm (bool): If to apply nomalization to the images. Default to `True`.
method (bool): the method to tranform time series into images. Must be `'gasf'`, `'gadf'`, or `'mtf'`. Default to `'gasf'`.
Returns:
timeseries_images (ndarray): The transformed images with shape (n, 1, number of assets * window_length, window_length).
dates (DataFrame): The corresonding dates with length (n,) and columns (`'DATE'`).
"""
if method == 'gasf':
transform = GramianAngularField()
elif method == 'gadf':
transform = GramianAngularField(method='difference')
elif method == 'mtf':
transform = MarkovTransitionField()
else:
raise ValueError(f'unknow value for arg method : {method}')
dates = timeseries['DATE']
timeseries = timeseries.drop(columns='DATE')
# convert the DataFrame to an array with shape (number of dates - window_length + 1, window_length, number of assets)
timeseries_windows = [timeseries_window.to_numpy() for timeseries_window in timeseries.rolling(window=window, step=slide_step)]
timeseries_windows = timeseries_windows[int(np.ceil((window-1)/slide_step)):]
timeseries_windows = np.stack(timeseries_windows) # number of windows * window_length * number of assets
# get the corresponding dates
dates = [dates_window.iloc[-1] for dates_window in dates.rolling(window=window, step=slide_step)]
dates = dates[int(np.ceil((window-1)/slide_step)):]
dates = pd.DataFrame({'DATE': dates})
# tranform sereis into images
# get a list of number of assets elements, each of which has shape (number of dates - window_length + 1, window_length, window_length)
timeseries_images = [transform.transform(timeseries_windows[:,:,i]) for i in range(timeseries_windows.shape[-1])]
# concatenate images along the row axis
# the results array has shape (number of dates - window_length + 1, number of assets * window_length, window_length)
timeseries_images = np.concatenate(timeseries_images, axis=1).astype(np.float32)
# normalize the images
if apply_norm:
timeseries_images = (timeseries_images - np.mean(timeseries_images)) / (np.std(timeseries_images) + 1e-8)
timeseries_images = np.expand_dims(timeseries_images,1)
return timeseries_images, dates
def timeseries_to_correlation(
timeseries: pd.DataFrame,
window: int,
slide_step: int,
methods: Literal['pearson', 'kendall', 'spearman'] | list | None = ['pearson', 'kendall', 'spearman']
):
"""
Transform multivariate time series into correlation matrices. Within each sliding window, a list with length of `len(methods)` containing different correlations will be gererated according to `methods`.
Args:
timeseries (DataFrame): DataFrame only with columns (`'DATE'`, index1, index2, ....) where indexn is the name of the index.
window (int): Length of the sliding widow, i.e., width of the tranformed images.
slide_step (int): Length of the sliding step.
methods (str | list): List of types of correlations.
Returns:
correlation_matrices (list): List with length of `len(df) - window_size + 1`, where each element list has length of `len(methods)` and contains different types of correlation matrices dertermined by `methods`.
dates (DataFrame): DataFrame with columns (`'DATE'`,)
"""
dates = timeseries['DATE']
timeseries = timeseries.drop(columns='DATE')
correlation_matrices = correlation.calculate_rolling_correlation(
df = timeseries,
window_size=window,
sliding_step=slide_step,
methods=methods
)
dates = [dates_window.iloc[-1] for dates_window in dates.rolling(window=window, step=slide_step)]
dates = dates[int(np.ceil((window-1)/slide_step)):]
dates = pd.DataFrame({'DATE': dates})
return correlation_matrices, dates
class TimeSeriesImagesDataset(Dataset):
"""
The dataset for the timeseries data for the pre-text tasks. The input the current image, and the target is the image after `lag`.
Attributes:
data (Tensor): The data with length n.
dates (DataFrame): The dates with length n - lag.
lag (int): Lag for selecting taget images.
"""
def __init__(
self,
data: np.ndarray,
dates: pd.DataFrame | None = None,
lag: int | None = 1
):
"""
Initialize the dataset.
Args:
data (ndarray): Array with shape (n, 1, num_indices * window_size, window_size).
dates (DataFrame): The corresponding datas to `data`.
lag (int): Lag for selecting the target images. Note: the value of `lag` is for the selected images. In other words,
`lag` = 1 means the image for the timestamp t+sliding_step will be used as the target for the image for the timestamp t.
"""
super().__init__()
self.data = torch.tensor(data)
self.lag = lag
self.dates = dates[:-self.lag]
def __len__(self):
return len(self.data) - self.lag
def __getitem__(self, index):
return self.data[index], self.data[index + self.lag]
class TimeSeriesCorrelationsDataset:
"""
The dataset for the correlation matrices.
"""
def __init__(
self,
data: List[List[pd.DataFrame]],
dates: pd.DataFrame | None = None
):
"""
Initialize the class.
Args:
data (list): A list each element of which is also a list containing different types of correlation matrices for the timestamp t.
dates (DataFrame): A DataFrame only with the column (`'DATE'`,), and has the same length of `data`. Default to `None`.
"""
assert len(data) == len(dates)
self.data = data
self.dates = dates
def __len__(self):
return len(self.data)
def apply_pca(similarity: np.ndarray, variance_threshold: int | float | None = 0.90):
"""
Apply PCA on the similarity matrix.
Args:
similarity (ndarray): The similarity matrix.
variance_threshold (int | float): If int, `variance_threshold` components will be kept.
If float, the components will be kept such that the total varient is over 90%.
Returns:
pca_transformed (ndarray): The resulted array, i.e., the features with row as a sample.
"""
scaler = StandardScaler()
scaled_data = scaler.fit_transform(similarity)
pca = PCA(n_components=variance_threshold,svd_solver='full')
pca_transformed = pca.fit_transform(scaled_data)
return pca_transformed