-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathvalidation_rules_service.py
executable file
·143 lines (127 loc) · 4.28 KB
/
validation_rules_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import cgi
import os
import platform
from modules.gitdox_sql import *
from modules.logintools import login
import modules.redis_cache as cache
import traceback
parameter = cgi.FieldStorage()
action = parameter.getvalue("action")
# for rules
doc = parameter.getvalue("doc")
corpus = parameter.getvalue("corpus")
domain = parameter.getvalue("domain")
name = parameter.getvalue("name")
operator = parameter.getvalue("operator")
argument = parameter.getvalue("argument")
id = parameter.getvalue("id")
# for schemas
schema_dir = os.path.dirname(os.path.realpath(__file__)) + os.sep + 'schemas'
extension = parameter.getvalue("extension")
# for sorting
sort = parameter.getvalue("jtSorting")
def row_to_dict(row):
return {'corpus': row[0],
'doc': row[1],
'domain': row[2],
'name': row[3],
'operator': row[4],
'argument': row[5],
'id': row[6]}
def list_rules():
resp = {}
try:
parameter = cgi.FieldStorage()
rules = get_validate_rules(sort=sort, domain=domain)
json_rules = [row_to_dict(row) for row in rules]
resp['Result'] = 'OK'
resp['Records'] = json_rules
print json.dumps(resp)
except:
resp['Result'] = 'Error'
resp['Message'] = 'Something went wrong while attempting to retrieve the list of rules.'
print json.dumps(resp)
def create_rule():
resp = {}
try:
id = create_validate_rule(doc, corpus, domain, name, operator, argument)
resp['Result'] = 'OK'
resp['Record'] = {'doc': doc,
'corpus': corpus,
'domain': domain,
'name': name,
'operator': operator,
'argument': argument,
'id': id}
cache.invalidate_by_type(domain)
print json.dumps(resp)
except:
resp['Result'] = 'Error'
resp['Message'] = 'Something went wrong while attempting to create a new rule.'
print json.dumps(resp)
def update_rule():
resp = {}
try:
update_validate_rule(doc, corpus, domain, name, operator, argument, id)
resp['Result'] = 'OK'
cache.invalidate_by_type(domain)
print json.dumps(resp)
except:
resp['Result'] = 'Error'
resp['Message'] = 'Something went wrong while attempting to update a rule.'
resp['Message'] += '\n' + traceback.format_exc()
print json.dumps(resp)
def delete_rule():
resp = {}
try:
domain = get_rule_domain(id)
delete_validate_rule(id)
resp['Result'] = 'OK'
cache.invalidate_by_type(domain)
print json.dumps(resp)
except:
resp['Result'] = 'Error'
resp['Message'] = 'Something went wrong while trying to delete a rule.'
print json.dumps(resp)
def list_schemas():
resp = {}
try:
resp['Result'] = 'OK'
resp['Options'] = [{"DisplayText": x, "Value": x}
for x in os.listdir(schema_dir)
if x.endswith(extension)]
print json.dumps(resp)
except:
resp['Result'] = 'Error'
resp['Message'] = 'Something went wrong while trying to list schemas.'
print json.dumps(resp)
def open_main_server():
thisscript = os.environ.get('SCRIPT_NAME', '')
loginaction = None
theform = cgi.FieldStorage()
scriptpath = os.path.dirname(os.path.realpath(__file__)) + os.sep
userdir = scriptpath + "users" + os.sep
loginaction, userconfig = login(theform, userdir, thisscript, loginaction)
user = userconfig["username"]
admin = userconfig["admin"]
print "Content-type:application/json\r\n\r\n"
if action == "list":
list_rules()
elif action == "listschemas":
list_schemas()
elif user == "demo":
print json.dumps({'Result': 'Error', 'Message': 'Demo user may not make changes.'})
elif action == "create":
create_rule()
elif action == "update":
update_rule()
elif action == "delete":
delete_rule()
else:
print json.dumps({'Result': 'Error',
'Message': 'Unknown action: "' + str(action) + '"'})
if __name__ == '__main__':
open_main_server()