-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmonitor_revoke.py
52 lines (43 loc) · 1.48 KB
/
monitor_revoke.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from celery import Celery
import yaml
import MySQLdb as mydb
import requests
import json
def notify():
print('*****')
url=Settings.ROOT_URL+'/easyweb/pusher/'
resp = {}
resp['status'] = 'error'
resp['data'] = 'Time Exceeded (30 sec)'
resp['kind'] = 'query'
requests.post(url, data=resp, verify=False)
def my_monitor(app):
state = app.events.State()
def update_revoked_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
jobid = task.uuid
with open('config/mysqlconfig.yaml', 'r') as cfile:
conf = yaml.load(cfile)['mysql']
con = mydb.connect(**conf)
q0 = "UPDATE Jobs SET status='{0}' where job = '{1}'".format('REVOKE', jobid)
with con:
cur = con.cursor()
cur.execute(q0)
con.commit()
print('TASK REVOKED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
print(task.exception)
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-revoked': update_revoked_tasks,
'task-failed': update_revoked_tasks,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery('ea_tasks')
app.config_from_object('config.celeryconfig')
my_monitor(app)