-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathquery_service.py
executable file
·165 lines (127 loc) · 6.52 KB
/
query_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
from flask import Flask, request
import parse_scos_message
import run_query
import urllib.request
from urllib.request import Request, urlopen
import os
import xml.etree.ElementTree as ET
import _thread as thread
app = Flask(__name__)
# files in local files dir should be accessible via urls
APOLLO_TYPES_NAMESPACE = 'http://types.apollo.pitt.edu/v4_0/'
SERVICES_COMMON_NAMESPACE = 'http://services-common.apollo.pitt.edu/v4_0/'
LOCAL_FILES_DIR = '/Users/nem41/Documents/sites/query-service/'
LOCAL_FILES_URL = 'http://localhost/query-service/'
FILE_SERVICE_URL = 'http://localhost:8080/broker-service-rest-frontend-4.0-SNAPSHOT/ws/files'
RUN_MANAGER_SERVICE_URL = 'http://localhost:8080/broker-service-rest-frontend-4.0-SNAPSHOT/ws'
@app.route('/run/<int:run_id>', methods=['POST'])
def query(run_id):
username = request.args.get('username')
password = request.args.get('password')
try:
thread.start_new_thread(run_query_thread, (username, password, run_id))
except:
return create_response(500)
return create_response(200)
def run_query_thread(username, password, run_id):
try:
# get scos queries
# get file from file service
# find query file
query_file_url = get_scos_file_url(run_id)
query_container = parse_scos_message.get_queries(query_file_url)
queries = query_container['query_objects']['queries']
output_formats = query_container['query_objects']['output_formats']
run_dir = LOCAL_FILES_DIR + str(run_id) + '/'
if not os.path.exists(run_dir):
os.makedirs(run_dir)
set_status('RUNNING', 'The query is running', run_id, username, password)
hdf5_file_url = get_hdf5_file_url(query_container['file_identification']) # get url from file store
if '.h5' in hdf5_file_url or '.hdf' in hdf5_file_url or '.hdf5' in hdf5_file_url:
hdf5_file = run_dir + 'simulator_output.hdf5'
elif '.csv' in hdf5_file_url:
hdf5_file = run_dir + 'simulator_output.csv'
urllib.request.urlretrieve(hdf5_file_url, hdf5_file)
for i in range(0, len(queries)):
# get hdf5 file listed in scos
scos = queries[i]
file_id = scos['file_id']
# run query from scos
files = run_query.run_query(scos, hdf5_file, output_formats, run_dir, file_id)
# upload file to file store
for file_container in files:
local_file_url = LOCAL_FILES_URL + str(run_id) + '/' + file_container['name'];
file_name = file_container['name']
file_type = file_container['type']
file_format = file_container['format']
data = urllib.parse.urlencode({'username': username, 'password': password, 'urlToFile': local_file_url, \
'fileName': file_name, 'fileType': file_type, 'fileFormat': file_format})
binary_data = data.encode('UTF-8')
urllib.request.urlopen(FILE_SERVICE_URL + '/' + str(run_id), binary_data)
set_status('COMPLETED', 'The query has completed.', run_id, username, password)
except Exception as e:
try:
set_status('FAILED', 'The query has failed: ' + str(e), run_id, username, password)
except Exception as e2:
print('Could not set error status for query run ' + str(run_id) + '. Message was: ' \
+ str(e2) + '. Original error was ' + str(e))
def get_files_list(run_id):
q = Request(FILE_SERVICE_URL + '/' + str(run_id))
q.add_header('Accept', 'application/xml')
list_files_result = urlopen(q).read()
tree = ET.fromstring(list_files_result)
for entry in tree.findall('{' + SERVICES_COMMON_NAMESPACE + '}responseBody'):
entry_tree = ET.fromstring(entry.text)
def get_output_file_url(run_id, file_label, file_type, file_format):
query = FILE_SERVICE_URL + '/' + str(run_id) + '/url?fileName=' + file_label + '&fileFormat=' + file_format + '&fileType=' + file_type
print(query)
q = Request(query)
q.add_header('Accept', 'application/xml')
response = urlopen(q).read()
tree = ET.fromstring(response)
for entry in tree.findall('{' + SERVICES_COMMON_NAMESPACE + '}responseBody'):
file_url = entry.text
return file_url
def get_scos_file_url(run_id):
# these will be the properties for the SCOS message xml file
file_label = 'run_message.xml'
file_type = 'RUN_MESSAGE'
file_format = 'TEXT'
return get_output_file_url(run_id, file_label, file_type, file_format)
# return 'http://localhost/num_infected_by_location.xml'
def get_hdf5_file_url(file_identification):
# these will be the properties for the hdf5 file
file_label = file_identification['label']
file_type = file_identification['type']
file_format = file_identification['format']
run_id = file_identification['run_id']
#
return get_output_file_url(run_id, file_label, file_type, file_format)
# return 'http://localhost/R0.1.4.apollo.h5.04.01.16'
def set_status(status, message, run_id, username, password):
data = urllib.parse.urlencode({'username': username, 'password': password, \
'statusMessage': message, 'methodCallStatusEnum': status})
binary_data = data.encode('UTF-8')
urllib.request.urlopen(RUN_MANAGER_SERVICE_URL + '/run/' + str(run_id) + '/status', binary_data)
print(status + ' ' + message)
def create_response(code):
ET.register_namespace('common', SERVICES_COMMON_NAMESPACE)
response = ET.Element('{' + SERVICES_COMMON_NAMESPACE + '}Response')
response_meta = ET.SubElement(response, '{' + SERVICES_COMMON_NAMESPACE + '}responseMeta')
status = ET.SubElement(response_meta, 'status')
status.text = str(code)
return ET.tostring(response)
if __name__ == "__main__":
app.run()
# data = urllib.parse.urlencode({'username': 'apollo_demo', 'password': 'apollo_demo', 'urlToFile': 'http://localhost/query-service/154/query_output_5.csv', \
# 'fileName': 'test.csv', 'fileType': 'QUERY_RESULT', 'fileFormat': 'TEXT'})
# binary_data = data.encode('UTF-8')
# urllib.request.urlopen(FILE_SERVICE_URL + '/' + str(154), binary_data)
# ET.register_namespace('common', SERVICES_COMMON_NAMESPACE)
#
# response = ET.Element('{' + SERVICES_COMMON_NAMESPACE + '}Response')
#
# response_meta = ET.SubElement(response, '{' + SERVICES_COMMON_NAMESPACE + '}responseMeta')
#
# status = ET.SubElement(response_meta, 'status')
# status.text = '200'