-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
125 lines (105 loc) · 3.35 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import hmac
import hashlib
import requests
import logging
import aiohttp
import asyncio
from functools import wraps
ErrorCode = {
1: "无效请求",
2: "无效版本",
3: "无效请求",
4: "没有访问权限",
5: "Key或签名无效,请重新创建",
6: "Key或签名无效,请重新创建",
7: "币种对不支持",
8: "币种不支持",
9: "币种不支持",
10: "验证错误",
11: "地址获取失败",
12: "参数为空",
13: "系统错误,联系管理员",
14: "无效用户",
15: "撤单太频繁,一分钟后再试",
16: "无效单号,或挂单已撤销",
17: "无效单号",
18: "无效挂单量",
19: "交易已暂停",
20: "挂单量太小",
21: "资金不足",
40: "请求太频繁,稍后再试"
}
def sign(params: dict, secret):
byte_secret = bytes(secret, encoding='utf-8')
sign = ''
for k, v in params.items():
sign += k + '=' + str(v) + '&'
bsign = bytes(sign[:-1], encoding='utf-8')
rst = hmac.new(byte_secret, bsign, hashlib.sha512).hexdigest()
return rst
def post(url, resource, params, api_key, secret):
r = requests.post(
url + resource,
data=params,
headers={
"KEY": api_key,
"SIGN": sign(params, secret)
},
timeout=5
)
rst = r.json()
if rst['result'] == "true" or rst['result'] is True:
return rst
# 处理错误
code = rst['code']
logging.error(f"API调用失败: {ErrorCode.get(code, rst['message'])}")
raise Exception(ErrorCode.get(code, rst['message']))
def get(url, resource, params=None):
r = requests.get(url + resource, params, timeout=5)
rst = r.json()
if rst['result'] == "true":
return rst
# 处理错误
code = rst['code']
logging.error(f"API调用失败: {ErrorCode[code]}")
raise Exception(ErrorCode[code])
timeout = aiohttp.ClientTimeout(total=5)
async def async_get(url, resource, params=None):
async with aiohttp.ClientSession(timeout=timeout) as session:
response = await session.get(url + resource, params=params, timeout=timeout)
rst = await response.json()
return rst
async def async_post(url, resource, params, api_key, secret):
async with aiohttp.ClientSession(timeout=timeout) as session:
response = await session.post(url + resource, data=params, headers={
"KEY": api_key,
"SIGN": sign(params, secret)
})
rst = await response.json()
if rst['result'] == "true" or rst['result'] is True:
return rst
# 处理错误
code = rst['code']
logging.error(f"API调用失败: {ErrorCode.get(code, rst['message'])}")
raise Exception(ErrorCode.get(code, rst['message']))
def make_async(func):
@wraps(func)
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
loop.run_until_complete(func(*args, **kwargs))
loop.close()
return wrapper
def retry(n):
def _wrapper1(func):
@wraps(func)
def _wrapper2(*args, **kwargs):
e = None
for _ in range(n):
try:
return func(*args, **kwargs)
except Exception as _e:
e = _e
else:
raise Exception(e)
return _wrapper2
return _wrapper1