-
Notifications
You must be signed in to change notification settings - Fork 541
/
Copy pathhistoric_async.py
112 lines (89 loc) · 3.6 KB
/
historic_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
from enum import Enum
import time
import alpaca_trade_api as tradeapi
import asyncio
import os
import pandas as pd
import sys
from alpaca_trade_api.rest import TimeFrame, URL
from alpaca_trade_api.rest_async import gather_with_concurrency, AsyncRest
NY = 'America/New_York'
class DataType(str, Enum):
Bars = "Bars"
Trades = "Trades"
Quotes = "Quotes"
def get_data_method(data_type: DataType):
if data_type == DataType.Bars:
return rest.get_bars_async
elif data_type == DataType.Trades:
return rest.get_trades_async
elif data_type == DataType.Quotes:
return rest.get_quotes_async
else:
raise Exception(f"Unsupoported data type: {data_type}")
async def get_historic_data_base(symbols, data_type: DataType, start, end,
timeframe: TimeFrame = None):
"""
base function to use with all
:param symbols:
:param start:
:param end:
:param timeframe:
:return:
"""
major = sys.version_info.major
minor = sys.version_info.minor
if major < 3 or minor < 6:
raise Exception('asyncio is not support in your python version')
msg = f"Getting {data_type} data for {len(symbols)} symbols"
msg += f", timeframe: {timeframe}" if timeframe else ""
msg += f" between dates: start={start}, end={end}"
print(msg)
step_size = 1000
results = []
for i in range(0, len(symbols), step_size):
tasks = []
for symbol in symbols[i:i+step_size]:
args = [symbol, start, end, timeframe.value] if timeframe else \
[symbol, start, end]
tasks.append(get_data_method(data_type)(*args))
if minor >= 8:
results.extend(await asyncio.gather(*tasks, return_exceptions=True))
else:
results.extend(await gather_with_concurrency(500, *tasks))
bad_requests = 0
for response in results:
if isinstance(response, Exception):
print(f"Got an error: {response}")
elif not len(response[1]):
bad_requests += 1
print(f"Total of {len(results)} {data_type}, and {bad_requests} "
f"empty responses.")
async def get_historic_bars(symbols, start, end, timeframe: TimeFrame):
await get_historic_data_base(symbols, DataType.Bars, start, end, timeframe)
async def get_historic_trades(symbols, start, end, timeframe: TimeFrame):
await get_historic_data_base(symbols, DataType.Trades, start, end)
async def get_historic_quotes(symbols, start, end, timeframe: TimeFrame):
await get_historic_data_base(symbols, DataType.Quotes, start, end)
async def main(symbols):
start = pd.Timestamp('2021-05-01', tz=NY).date().isoformat()
end = pd.Timestamp('2021-08-30', tz=NY).date().isoformat()
timeframe: TimeFrame = TimeFrame.Day
await get_historic_bars(symbols, start, end, timeframe)
await get_historic_trades(symbols, start, end, timeframe)
await get_historic_quotes(symbols, start, end, timeframe)
if __name__ == '__main__':
api_key_id = os.environ.get('APCA_API_KEY_ID')
api_secret = os.environ.get('APCA_API_SECRET_KEY')
base_url = "https://paper-api.alpaca.markets"
feed = "iex" # change to "sip" if you have a paid account
rest = AsyncRest(key_id=api_key_id,
secret_key=api_secret)
api = tradeapi.REST(key_id=api_key_id,
secret_key=api_secret,
base_url=URL(base_url))
start_time = time.time()
symbols = [el.symbol for el in api.list_assets(status='active')]
symbols = symbols[:200]
asyncio.run(main(symbols))
print(f"took {time.time() - start_time} sec")