-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrisk_models.py
560 lines (498 loc) · 20.8 KB
/
risk_models.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
"""
The ``risk_models`` module provides functions for estimating the covariance matrix given
historical returns.
The format of the data input is the same as that in :ref:`expected-returns`.
**Currently implemented:**
- fix non-positive semidefinite matrices
- general risk matrix function, allowing you to run any risk model from one function.
- sample covariance
- semicovariance
- exponentially weighted covariance
- minimum covariance determinant
- shrunk covariance matrices:
- manual shrinkage
- Ledoit Wolf shrinkage
- Oracle Approximating shrinkage
- covariance to correlation matrix
"""
import warnings
import numpy as np
import pandas as pd
from .expected_returns import returns_from_prices
def _is_positive_semidefinite(matrix):
"""
Helper function to check if a given matrix is positive semidefinite.
Any method that requires inverting the covariance matrix will struggle
with a non-positive semidefinite matrix
:param matrix: (covariance) matrix to test
:type matrix: np.ndarray, pd.DataFrame
:return: whether matrix is positive semidefinite
:rtype: bool
"""
try:
# Significantly more efficient than checking eigenvalues (stackoverflow.com/questions/16266720)
np.linalg.cholesky(matrix + 1e-16 * np.eye(len(matrix)))
return True
except np.linalg.LinAlgError:
return False
def fix_nonpositive_semidefinite(matrix, fix_method="spectral"):
"""
Check if a covariance matrix is positive semidefinite, and if not, fix it
with the chosen method.
The ``spectral`` method sets negative eigenvalues to zero then rebuilds the matrix,
while the ``diag`` method adds a small positive value to the diagonal.
:param matrix: raw covariance matrix (may not be PSD)
:type matrix: pd.DataFrame
:param fix_method: {"spectral", "diag"}, defaults to "spectral"
:type fix_method: str, optional
:raises NotImplementedError: if a method is passed that isn't implemented
:return: positive semidefinite covariance matrix
:rtype: pd.DataFrame
"""
if _is_positive_semidefinite(matrix):
return matrix
warnings.warn(
"The covariance matrix is non positive semidefinite. Amending eigenvalues."
)
# Eigendecomposition
q, V = np.linalg.eigh(matrix)
if fix_method == "spectral":
# Remove negative eigenvalues
q = np.where(q > 0, q, 0)
# Reconstruct matrix
fixed_matrix = V @ np.diag(q) @ V.T
elif fix_method == "diag":
min_eig = np.min(q)
fixed_matrix = matrix - 1.1 * min_eig * np.eye(len(matrix))
else:
raise NotImplementedError(
"Method {} not implemented".format(fix_method))
if not _is_positive_semidefinite(fixed_matrix): # pragma: no cover
warnings.warn(
"Could not fix matrix. Please try a different risk model.", UserWarning
)
# Rebuild labels if provided
if isinstance(matrix, pd.DataFrame):
tickers = matrix.index
return pd.DataFrame(fixed_matrix, index=tickers, columns=tickers)
else:
return fixed_matrix
def risk_matrix(prices, method="sample_cov", **kwargs):
"""
Compute a covariance matrix, using the risk model supplied in the ``method``
parameter.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param method: the risk model to use. Should be one of:
- ``sample_cov``
- ``semicovariance``
- ``exp_cov``
- ``ledoit_wolf``
- ``ledoit_wolf_constant_variance``
- ``ledoit_wolf_single_factor``
- ``ledoit_wolf_constant_correlation``
- ``oracle_approximating``
:type method: str, optional
:raises NotImplementedError: if the supplied method is not recognised
:return: annualised sample covariance matrix
:rtype: pd.DataFrame
"""
if method == "sample_cov":
return sample_cov(prices, **kwargs)
elif method == "semicovariance" or method == "semivariance":
return semicovariance(prices, **kwargs)
elif method == "exp_cov":
return exp_cov(prices, **kwargs)
elif method == "ledoit_wolf" or method == "ledoit_wolf_constant_variance":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf()
elif method == "ledoit_wolf_single_factor":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="single_factor"
)
elif method == "ledoit_wolf_constant_correlation":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="constant_correlation"
)
elif method == "oracle_approximating":
return CovarianceShrinkage(prices, **kwargs).oracle_approximating()
else:
raise NotImplementedError(
"Risk model {} not implemented".format(method))
def sample_cov(prices, returns_data=False, frequency=252, log_returns=False, **kwargs):
"""
Calculate the annualised sample covariance matrix of (daily) asset returns.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:param log_returns: whether to compute using log returns
:type log_returns: bool, defaults to False
:return: annualised sample covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices, log_returns)
return fix_nonpositive_semidefinite(
returns.cov() * frequency, kwargs.get("fix_method", "spectral")
)
def semicovariance(
prices,
returns_data=False,
benchmark=0.000079,
frequency=252,
log_returns=False,
**kwargs
):
"""
Estimate the semicovariance matrix, i.e the covariance given that
the returns are less than the benchmark.
.. semicov = E([min(r_i - B, 0)] . [min(r_j - B, 0)])
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param benchmark: the benchmark return, defaults to the daily risk-free rate, i.e
:math:`1.02^{(1/252)} -1`.
:type benchmark: float
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year). Ensure that you use the appropriate
benchmark, e.g if ``frequency=12`` use the monthly risk-free rate.
:type frequency: int, optional
:param log_returns: whether to compute using log returns
:type log_returns: bool, defaults to False
:return: semicovariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices, log_returns)
drops = np.fmin(returns - benchmark, 0)
T = drops.shape[0]
return fix_nonpositive_semidefinite(
(drops.T @ drops) / T * frequency, kwargs.get("fix_method", "spectral")
)
def _pair_exp_cov(X, Y, span=180):
"""
Calculate the exponential covariance between two timeseries of returns.
:param X: first time series of returns
:type X: pd.Series
:param Y: second time series of returns
:type Y: pd.Series
:param span: the span of the exponential weighting function, defaults to 180
:type span: int, optional
:return: the exponential covariance between X and Y
:rtype: float
"""
covariation = (X - X.mean()) * (Y - Y.mean())
# Exponentially weight the covariation and take the mean
if span < 10:
warnings.warn("it is recommended to use a higher span, e.g 30 days")
return covariation.ewm(span=span).mean().iloc[-1]
def exp_cov(
prices, returns_data=False, span=180, frequency=252, log_returns=False, **kwargs
):
"""
Estimate the exponentially-weighted covariance matrix, which gives
greater weight to more recent data.
:param prices: adjusted closing prices of the asset, each row is a date
and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param span: the span of the exponential weighting function, defaults to 180
:type span: int, optional
:param frequency: number of time periods in a year, defaults to 252 (the number
of trading days in a year)
:type frequency: int, optional
:param log_returns: whether to compute using log returns
:type log_returns: bool, defaults to False
:return: annualised estimate of exponential covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
assets = prices.columns
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices, log_returns)
N = len(assets)
# Loop over matrix, filling entries with the pairwise exp cov
S = np.zeros((N, N))
for i in range(N):
for j in range(i, N):
S[i, j] = S[j, i] = _pair_exp_cov(
returns.iloc[:, i], returns.iloc[:, j], span
)
cov = pd.DataFrame(S * frequency, columns=assets, index=assets)
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
def min_cov_determinant(
prices,
returns_data=False,
frequency=252,
random_state=None,
log_returns=False,
**kwargs
): # pragma: no cover
warnings.warn(
"min_cov_determinant is deprecated and will be removed in v1.5")
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
# Extra dependency
try:
import sklearn.covariance
except (ModuleNotFoundError, ImportError):
raise ImportError("Please install scikit-learn via pip or poetry")
assets = prices.columns
if returns_data:
X = prices
else:
X = returns_from_prices(prices, log_returns)
# X = np.nan_to_num(X.values)
X = X.dropna().values
raw_cov_array = sklearn.covariance.fast_mcd(
X, random_state=random_state)[1]
cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * frequency
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
def cov_to_corr(cov_matrix):
"""
Convert a covariance matrix to a correlation matrix.
:param cov_matrix: covariance matrix
:type cov_matrix: pd.DataFrame
:return: correlation matrix
:rtype: pd.DataFrame
"""
if not isinstance(cov_matrix, pd.DataFrame):
warnings.warn("cov_matrix is not a dataframe", RuntimeWarning)
cov_matrix = pd.DataFrame(cov_matrix)
Dinv = np.diag(1 / np.sqrt(np.diag(cov_matrix)))
corr = np.dot(Dinv, np.dot(cov_matrix, Dinv))
return pd.DataFrame(corr, index=cov_matrix.index, columns=cov_matrix.index)
def corr_to_cov(corr_matrix, stdevs):
"""
Convert a correlation matrix to a covariance matrix
:param corr_matrix: correlation matrix
:type corr_matrix: pd.DataFrame
:param stdevs: vector of standard deviations
:type stdevs: array-like
:return: covariance matrix
:rtype: pd.DataFrame
"""
if not isinstance(corr_matrix, pd.DataFrame):
warnings.warn("corr_matrix is not a dataframe", RuntimeWarning)
corr_matrix = pd.DataFrame(corr_matrix)
return corr_matrix * np.outer(stdevs, stdevs)
class CovarianceShrinkage:
"""
Provide methods for computing shrinkage estimates of the covariance matrix, using the
sample covariance matrix and choosing the structured estimator to be an identity matrix
multiplied by the average sample variance. The shrinkage constant can be input manually,
though there exist methods (notably Ledoit Wolf) to estimate the optimal value.
Instance variables:
- ``X`` - pd.DataFrame (returns)
- ``S`` - np.ndarray (sample covariance matrix)
- ``delta`` - float (shrinkage constant)
- ``frequency`` - int
"""
def __init__(self, prices, returns_data=False, frequency=252, log_returns=False):
"""
:param prices: adjusted closing prices of the asset, each row is a date and each column is a ticker/id.
:type prices: pd.DataFrame
:param returns_data: if true, the first argument is returns instead of prices.
:type returns_data: bool, defaults to False.
:param frequency: number of time periods in a year, defaults to 252 (the number of trading days in a year)
:type frequency: int, optional
:param log_returns: whether to compute using log returns
:type log_returns: bool, defaults to False
"""
# Optional import
try:
from sklearn import covariance
self.covariance = covariance
except (ModuleNotFoundError, ImportError): # pragma: no cover
raise ImportError("Please install scikit-learn via pip or poetry")
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
self.frequency = frequency
if returns_data:
self.X = prices.dropna(how="all")
else:
self.X = returns_from_prices(prices, log_returns).dropna(how="all")
self.S = self.X.cov().values
self.delta = None # shrinkage constant
def _format_and_annualize(self, raw_cov_array):
"""
Helper method which annualises the output of shrinkage calculations,
and formats the result into a dataframe
:param raw_cov_array: raw covariance matrix of daily returns
:type raw_cov_array: np.ndarray
:return: annualised covariance matrix
:rtype: pd.DataFrame
"""
assets = self.X.columns
cov = pd.DataFrame(raw_cov_array, index=assets,
columns=assets) * self.frequency
return fix_nonpositive_semidefinite(cov, fix_method="spectral")
def shrunk_covariance(self, delta=0.2):
"""
Shrink a sample covariance matrix to the identity matrix (scaled by the average
sample variance). This method does not estimate an optimal shrinkage parameter,
it requires manual input.
:param delta: shrinkage parameter, defaults to 0.2.
:type delta: float, optional
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
self.delta = delta
N = self.S.shape[1]
# Shrinkage target
mu = np.trace(self.S) / N
F = np.identity(N) * mu
# Shrinkage
shrunk_cov = delta * F + (1 - delta) * self.S
return self._format_and_annualize(shrunk_cov)
def ledoit_wolf(self, shrinkage_target="constant_variance"):
"""
Calculate the Ledoit-Wolf shrinkage estimate for a particular
shrinkage target.
:param shrinkage_target: choice of shrinkage target, either ``constant_variance``,
``single_factor`` or ``constant_correlation``. Defaults to
``constant_variance``.
:type shrinkage_target: str, optional
:raises NotImplementedError: if the shrinkage_target is unrecognised
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
if shrinkage_target == "constant_variance":
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.ledoit_wolf(X)
elif shrinkage_target == "single_factor":
shrunk_cov, self.delta = self._ledoit_wolf_single_factor()
elif shrinkage_target == "constant_correlation":
shrunk_cov, self.delta = self._ledoit_wolf_constant_correlation()
else:
raise NotImplementedError(
"Shrinkage target {} not recognised".format(shrinkage_target)
)
return self._format_and_annualize(shrunk_cov)
def _ledoit_wolf_single_factor(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the Sharpe single-factor matrix as the shrinkage target.
See Ledoit and Wolf (2001).
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
# De-mean returns
t, n = np.shape(X)
Xm = X - X.mean(axis=0)
xmkt = Xm.mean(axis=1).reshape(t, 1)
# compute sample covariance matrix
sample = np.cov(np.append(Xm, xmkt, axis=1),
rowvar=False) * (t - 1) / t
betas = sample[0:n, n].reshape(n, 1)
varmkt = sample[n, n]
sample = sample[:n, :n]
F = np.dot(betas, betas.T) / varmkt
F[np.eye(n) == 1] = np.diag(sample)
# compute shrinkage parameters
c = np.linalg.norm(sample - F, "fro") ** 2
y = Xm ** 2
p = 1 / t * np.sum(np.dot(y.T, y)) - np.sum(sample ** 2)
# r is divided into diagonal
# and off-diagonal terms, and the off-diagonal term
# is itself divided into smaller terms
rdiag = 1 / t * np.sum(y ** 2) - sum(np.diag(sample) ** 2)
z = Xm * np.tile(xmkt, (n,))
v1 = 1 / t * np.dot(y.T, z) - np.tile(betas, (n,)) * sample
roff1 = (
np.sum(v1 * np.tile(betas, (n,)).T) / varmkt
- np.sum(np.diag(v1) * betas.T) / varmkt
)
v3 = 1 / t * np.dot(z.T, z) - varmkt * sample
roff3 = (
np.sum(v3 * np.dot(betas, betas.T)) / varmkt ** 2
- np.sum(np.diag(v3).reshape(-1, 1) * betas ** 2) / varmkt ** 2
)
roff = 2 * roff1 - roff3
r = rdiag + roff
# compute shrinkage constant
k = (p - r) / c
delta = max(0, min(1, k / t))
# compute the estimator
shrunk_cov = delta * F + (1 - delta) * sample
return shrunk_cov, delta
def _ledoit_wolf_constant_correlation(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the constant correlation matrix as the shrinkage target.
See Ledoit and Wolf (2003)
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
t, n = np.shape(X)
S = self.S # sample cov matrix
# Constant correlation target
var = np.diag(S).reshape(-1, 1)
std = np.sqrt(var)
_var = np.tile(var, (n,))
_std = np.tile(std, (n,))
r_bar = (np.sum(S / (_std * _std.T)) - n) / (n * (n - 1))
F = r_bar * (_std * _std.T)
F[np.eye(n) == 1] = var.reshape(-1)
# Estimate pi
Xm = X - X.mean(axis=0)
y = Xm ** 2
pi_mat = np.dot(y.T, y) / t - 2 * np.dot(Xm.T, Xm) * S / t + S ** 2
pi_hat = np.sum(pi_mat)
# Theta matrix, expanded term by term
term1 = np.dot((Xm ** 3).T, Xm) / t
help_ = np.dot(Xm.T, Xm) / t
help_diag = np.diag(help_)
term2 = np.tile(help_diag, (n, 1)).T * S
term3 = help_ * _var
term4 = _var * S
theta_mat = term1 - term2 - term3 + term4
theta_mat[np.eye(n) == 1] = np.zeros(n)
rho_hat = sum(np.diag(pi_mat)) + r_bar * np.sum(
np.dot((1 / std), std.T) * theta_mat
)
# Estimate gamma
gamma_hat = np.linalg.norm(S - F, "fro") ** 2
# Compute shrinkage constant
kappa_hat = (pi_hat - rho_hat) / gamma_hat
delta = max(0.0, min(1.0, kappa_hat / t))
# Compute shrunk covariance matrix
shrunk_cov = delta * F + (1 - delta) * S
return shrunk_cov, delta
def oracle_approximating(self):
"""
Calculate the Oracle Approximating Shrinkage estimate
:return: shrunk sample covariance matrix
:rtype: np.ndarray
"""
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.oas(X)
return self._format_and_annualize(shrunk_cov)