-
Notifications
You must be signed in to change notification settings - Fork 0
/
FPG_Reference_Utils.py
147 lines (102 loc) · 5.34 KB
/
FPG_Reference_Utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import numpy as np
import pandas as pd
from FPG_Inp import FPG_Input
from Company import Company
Geographical_Grouping = {'United States': 'North America',
'United Kingdom': 'Europe',
'France': 'Europe',
'Taiwan': 'Asia',
'South Korea': 'Asia',
'China': 'Asia',
'Japan': 'Asia'
}
def load_historical_data(Inp: FPG_Input):
historical_data = pd.DataFrame()
for ticker in Inp.tickers:
temp = pd.read_csv('Database/' + ticker + '_price_history.csv')
temp['Date'] = pd.to_datetime(temp['Date'])
temp.set_index('Date',inplace=True)
temp.columns = pd.MultiIndex.from_product([[ticker], temp.columns])
historical_data = pd.concat([historical_data, temp], axis=1)
del temp
return historical_data
def calculate_market_caps(Cmpnys, historical_data):
"""
Calculates market cap history for each company
:param Cmpnys: Company objects for the stock.
:param historical_data: past prices for each company.
:return: DataFrame of the calculated market caps.
"""
MarketCaps = pd.DataFrame(columns=list(Cmpnys.keys()),index=historical_data.index)
for ticker in Cmpnys.keys():
adj_close_prices = historical_data[ticker]['Adj Close'].to_numpy()
adj_close_prices = adj_close_prices.reshape(-1) # Flatten if necessary (e.g., in case it's a 2D array)
MarketCaps[ticker] = adj_close_prices * Cmpnys[ticker].shares_outstanding
return MarketCaps
def calculate_stock_weights(market_caps):
"""
Optimized version to calculate the weight of each stock in the market on each date.
:param market_caps: pandas DataFrame of market caps, indexed by dates, with columns for each stock.
:return: pandas DataFrame with the weight of each stock on each date.
"""
# Calculate the total market cap for each date in a vectorized way
total_market_caps = market_caps.sum(axis=1)
# Avoid division by zero by filling any NaNs in total_market_caps with 1 (to keep results valid)
# The result will still be NaN for dates with zero market cap
total_market_caps = total_market_caps.replace(0, 1)
# Calculate weights using broadcasting (no loop needed)
stock_weights = market_caps.div(total_market_caps, axis=0)
return stock_weights
def calculate_index(historical_data, Weights):
tickers_historical_data = historical_data.loc[:, (slice(None), ["Adj Close"])]
temp = np.nan_to_num(Weights.to_numpy() * tickers_historical_data.to_numpy(),0).sum(axis=1)
idx = pd.DataFrame(index=tickers_historical_data.index, columns=["price index", "index returns"])
first_value_pos = np.where(temp > 0)[0][0]
distribution = {"sorted daily change": np.array([]), "CDF": np.array([])}
daily_change = (np.append(0,np.diff(temp[first_value_pos:]))/temp[first_value_pos:])*100
daily_change_sorted = np.sort(daily_change)
n = daily_change_sorted.__len__()
cumulative_probs = np.arange(1, n + 1) / n # Cumulative probabilities from 0 to 1
idx["price index"] = temp.astype('float32')
idx["index returns"] = (((temp/temp[first_value_pos]) - 1)*100).astype('float32')
distribution["sorted daily change"] = daily_change_sorted
distribution["CDF"] = cumulative_probs
del temp
return idx, distribution
def calculate_sector_geographic_indices(historical_data, MarketCaps, Cmpnys):
sectors = []
# countries = []
regions = []
SectorGroups = {}
GeoGroups = {}
for ticker in Cmpnys.keys():
sector = Cmpnys[ticker].sector
# country = Cmpnys[ticker].country
region = Cmpnys[ticker].region
sectors.append(sector)
# countries.append(country)
# region = Geographical_Grouping[country]
regions.append(region)
if(region not in GeoGroups):
GeoGroups[region] = []
GeoGroups[region].append(ticker)
if(sector not in SectorGroups):
SectorGroups[sector] = []
SectorGroups[sector].append(ticker)
sectors = np.unique(sectors)
regions = np.unique(regions)
sector_multi_index = pd.MultiIndex.from_product([sectors, ["price index", "index returns"]], names=["Sector", "Type"])
geo_multi_index = pd.MultiIndex.from_product([regions, ["price index", "index returns"]], names=["Sector", "Type"])
Sector_idx = pd.DataFrame(index=historical_data.index, columns=sector_multi_index)
Geo_idx = pd.DataFrame(index=historical_data.index, columns=geo_multi_index)
sectors_distributions = {}
for sector in sectors:
Tickers = SectorGroups[sector]
Sector_Weights = calculate_stock_weights(MarketCaps[Tickers])
Sector_idx[sector], sectors_distributions[sector] = calculate_index(historical_data[Tickers], Sector_Weights)
Geo_distributions = {}
for region in regions:
Tickers = GeoGroups[region]
region_Weights = calculate_stock_weights(MarketCaps[Tickers])
Geo_idx[region], Geo_distributions[region] = calculate_index(historical_data.loc[:, (Tickers ,slice(None))], region_Weights)
return Sector_idx, sectors_distributions, Geo_idx, Geo_distributions