-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserver.py
43 lines (33 loc) · 1.23 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
with open("/var/run/argo/token") as f:
token = f.read()
class Plugin(BaseHTTPRequestHandler):
def args(self):
return json.loads(self.rfile.read(int(self.headers.get('Content-Length'))))
def reply(self, reply):
self.send_response(200)
self.end_headers()
self.wfile.write(json.dumps(reply).encode("UTF-8"))
def forbidden(self):
self.send_response(403)
self.end_headers()
def unsupported(self):
self.send_response(404)
self.end_headers()
def do_POST(self):
if self.headers.get("Authorization") != "Bearer " + token:
self.forbidden()
elif self.path == '/api/v1/template.execute':
args = self.args()
if 'hello' in args['template'].get('plugin', {}):
self.reply(
{'node': {'phase': 'Succeeded', 'message': 'Hello template!',
'outputs': {'parameters': [{'name': 'foo', 'value': 'bar'}]}}})
else:
self.reply({})
else:
self.unsupported()
if __name__ == '__main__':
httpd = HTTPServer(('', 4355), Plugin)
httpd.serve_forever()