This repository has been archived by the owner on May 4, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathApiSDKJsonClient.py
163 lines (133 loc) · 4.72 KB
/
ApiSDKJsonClient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
# coding=utf-8
import configparser
import json
import os
import traceback as tb
import requests
from .AuthHeader import AuthHeader
from .JsonEnvelop import JsonEnvelop
'''
json client
'''
class ApiSDKJsonClient():
# client params
__productline = None
__version = None
__service = None
__url = None
__username = None
__password = None
__token = None
__target = None
__accessToken = None
__action = None
# conf params
urlconf = None
usernameconf = None
passwordconf = None
tokenconf = None
targetconf = None
actionconf = None
def __init__(self, productline, version, service, token=None):
try:
self.initConfig(token)
self.__productline = productline
self.__version = version
self.__service = service
self.__url = self.urlconf
self.__username = ApiSDKJsonClient.usernameconf
self.__password = ApiSDKJsonClient.passwordconf
self.__token = ApiSDKJsonClient.tokenconf
self.__target = ApiSDKJsonClient.targetconf
self.__action = ApiSDKJsonClient.actionconf
except Exception as e:
print(e)
tb.print_exc()
def initConfig(self, token):
# if ApiSDKJsonClient.actionconf is None:
cf = configparser.ConfigParser(allow_no_value=True)
if token is None:
cf.read(
os.path.abspath(os.path.dirname(__file__)) +
"./baidu-api.properties",
encoding="utf-8")
else:
cf.read_string(token)
ApiSDKJsonClient.urlconf = cf.get("api", "serverUrl")
ApiSDKJsonClient.usernameconf = cf.get("api", "username")
ApiSDKJsonClient.passwordconf = cf.get("api", "password")
ApiSDKJsonClient.tokenconf = cf.get("api", "token")
ApiSDKJsonClient.targetconf = cf.get("api", "target")
ApiSDKJsonClient.actionconf = cf.get("api", "action")
def execute(self, method, request):
try:
url = self.__url + '/json/' + \
self.__productline + '/' + \
self.__version + '/' + \
self.__service + '/' + method
header = AuthHeader(
username=self.__username,
password=self.__password,
token=self.__token,
target=self.__target,
accessToken=self.__accessToken)
if (request is None):
request = {}
jsonEnv = JsonEnvelop(header, request)
jsonStr = json.dumps(
jsonEnv, default=convert_to_builtin_type, skipkeys=True, ensure_ascii=False).encode('utf8')
if type(jsonStr) is bytes:
jsonStr_print = jsonStr.decode("utf-8")
print(jsonStr_print)
headers = {'content-type': 'application/json;charset=utf-8'}
r = requests.post(url, data=jsonStr, headers=headers)
return r.json()
except Exception as e:
print(e)
tb.print_exc()
def setUrl(self, url):
self.__url = url
def setUsername(self, username):
self.__username = username
def setPassword(self, password):
self.__password = password
def setToken(self, token):
self.__token = token
def setTarget(self, target):
self.__target = target
def setAccessToken(self, accessToken):
self.__accessToken = accessToken
'''
print response result
'''
def printJsonResponse(res):
resheader = res["header"]
resbody = res["body"]
failures = resheader["failures"]
print("Response Header=>")
print("Execution result: \t", resheader["desc"])
print(" operations: \t", resheader["oprs"])
print(" operation time: \t", resheader["oprtime"])
if (resheader["quota"] is not None):
print(" consume quota: \t", resheader["quota"])
if (resheader["rquota"] is not None):
print(" remain quota: \t", resheader["rquota"])
print(" status: \t", resheader["status"])
if failures is not None:
for failure in failures:
print(" code: \t", failure["code"])
print(" message: \t", failure["message"])
print(" position: \t", failure["position"])
print("Response Body=>")
if resbody is not None:
jsonStr = json.dumps(
resbody, default=convert_to_builtin_type, skipkeys=True, ensure_ascii=False).encode('utf8')
if type(jsonStr) is bytes:
jsonStr_print = jsonStr.decode("utf-8")
print(jsonStr_print)
# 转换函数
def convert_to_builtin_type(obj):
# 把MyObj对象转换成dict类型的对象
d = {}
d.update(obj.__dict__)
return d