-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcryptoinscriber
executable file
·178 lines (136 loc) · 5.65 KB
/
cryptoinscriber
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#!/usr/bin/env python3
import argparse
import ccxt
import csv
import json
import os
import sys
import time
from typing import Type, List
banner = r"""
____ ____ _ _ ___ ___ ____ _ __ _ ____ ____ ____ _ ___ ____ ____
|___ |--< Y |--' | [__] | | \| ==== |___ |--< | |==] |=== |--<
"""
def dedup(tdCurr: dict, tdPrev: dict) -> dict:
if tdCurr == tdPrev:
return {}
if tdPrev == None:
return tdCurr
tdCurrIDs = [execution['id'] for execution in tdCurr]
tdPrevLatestID = tdPrev[0]['id']
if tdPrevLatestID not in tdCurrIDs:
print('[-] Possibly missing executions between {} and {}...'.format(
time.asctime(time.localtime(int(str(tdPrev[0]['timestamp'])[:-3]))),
time.asctime(time.localtime(int(str(tdCurr[-1]['timestamp'])[:-3])))
))
return tdCurr
return tdCurr[:tdCurrIDs.index(tdPrev[0]['id'])]
def convert(tradeData: dict, lite: bool) -> List[List]:
converted = []
for execution in tradeData:
# ID, Timestamp, Price, Amount, Side
if lite:
converted.insert(0, [
execution['timestamp'],
round(float(execution['price'])),
execution['amount'],
])
else:
converted.insert(0, [
execution['id'],
execution['timestamp'],
execution['price'],
execution['amount'],
execution['side']
])
return converted
def buildParser() -> argparse.ArgumentParser:
# Define Parser
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
description='CryptoInscriber | Cryptocurrency historical trade data poller'
)
# Exchange
parser.add_argument('-e', '--exchange', metavar='bitstamp', required=True, help='cryptocurrency exchange name, see https://github.com/ccxt/ccxt#supported-cryptocurrency-exchange-markets for available exchanges')
# Market's Symbol/Pair
parser.add_argument('-m', '--market', metavar='BTC/USD', required=True, help='market symbol/pair to poll from')
# Rate Limit Per Minute
parser.add_argument('-r', '--rate', metavar='60', type=int, default=60, help='rate limit, max number or requests per minute')
# Iterations
parser.add_argument('-i', '--iterations', metavar='0', type=int, default=0, help='number of times to poll executions before quitting, 0 to run indefinitely')
# Output Directory
parser.add_argument('-o', '--output', metavar='out', default='out', help='output directory to store CSV files')
# Custom Params
parser.add_argument('-c', '--custom', metavar='\'{"limit": 20}\'', default='{}', type=json.loads, help='custom exchange-specific parameters to use while polling for executions')
# Mininmal Output
parser.add_argument('-l', '--lite', action='store_true', help='minimizes output rows; removes ID (first column), rounds price to nearest whole number (assuming decimal precision is not important), removes side (5th column)')
return parser
def parseArgs(parser: argparse.ArgumentParser) -> argparse.Namespace:
# Validate Args Length
if len(sys.argv) < 2:
print(banner)
parser.print_help()
exit(1)
# Parse
args = parser.parse_args()
return args
def getExchange(exchangeName: str) -> ccxt.Exchange:
try:
exchange = getattr(ccxt, exchangeName)()
except AttributeError:
print('[-] Error: "{}" exchange is not available. Refer to https://github.com/ccxt/ccxt#supported-cryptocurrency-exchange-markets for available markets.'.format(exchangeName))
exit(1)
exchange.load_markets()
return exchange
def createOutputDir(directory: str) -> None:
if os.path.isdir(directory):
return
try:
os.mkdir(directory)
except FileExistsError:
print('[-] A non-directory item "{}" already exists. Please use another output name.'.format(directory))
exit(1)
def getTradeData(exchange: ccxt.Exchange, market: str, customParams: dict = {}) -> List[dict]:
return exchange.fetch_trades(market, params=customParams)
def writeCSV(data: List[List], filename: str) -> None:
with open(filename, 'a') as f:
fcsv = csv.writer(f)
fcsv.writerows(data)
def main():
try:
parser = buildParser()
args = parseArgs(parser)
exchange = getExchange(args.exchange)
market = args.market.upper()
delay = 60 / args.rate
iterations = args.iterations
createOutputDir(args.output)
outputFile = os.path.join(args.output, '{}_{}_{}.csv'.format(
exchange.name,
market.replace('/', ''),
time.strftime('%Y-%m-%d_%H-%M-%S')
))
tradeDataPrev = None
while args.iterations == 0 or iterations > 0:
try:
tradeData = getTradeData(exchange, market, customParams=args.custom)
except ccxt.NetworkError:
print('[-] Facing network issues, retrying in {} seconds...'.format(delay))
time.sleep(delay)
continue
tradeDataClean = dedup(tradeData, tradeDataPrev)
tradeDataClean = convert(tradeDataClean, args.lite)
writeCSV(tradeDataClean, outputFile)
tradeDataPrev = tradeData
print('[{}] Recorded {} trade executions. Sleeping for {} seconds...'.format(
time.asctime(),
len(tradeDataClean),
delay
))
if args.iterations:
iterations -= 1
time.sleep(delay)
except KeyboardInterrupt:
pass
if __name__ == '__main__':
main()