-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlustre_realtimejobs_aps.py
executable file
·175 lines (149 loc) · 6.28 KB
/
lustre_realtimejobs_aps.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python
import sys
import time
import pwd
import _inotify
import anydbm
# new time format
import dateutil.parser
import calendar
sys.path.append("MySQL")
import MySQLObject
# ROOTDIR="/var/spool/torque/server_priv/accounting"
ROOTDIR = "/home/berger/Lustre/testdata/watch"
FILEPREFIX="apssched"
class Logfile:
def __init__(self, prefix, path, filename):
''' path: directory where the file is
filename: name of file to read
prefix: filename has to start with this prefix
'''
self.path = path
self.prefix = prefix
self.filename = self.path + "/" + filename
self.f = open(self.filename, "r")
self.db = MySQLObject.MySQLObject()
# map resid to job - make it persistent as Bound line is only line containing jobid
self.resToJob = anydbm.open('resToJob', 'c')
# print "init:",self.resToJob
self.usermap = {}
self.readusermap()
self.read_from_last_pos_to_end()
def readusermap(self):
'''read mapping file in format of /etc/passwd'''
f = open(self.db.usermapping,"r")
for l in f:
sp = l.split(":")
self.usermap(sp[2]) = sp[0]
f.close()
self.usermapage = time.time()
def mapuser(self, uid):
''' map uid to username, read mapping file at most every 5 minutes, if a user is not known
try to read mapping file, if still not known, return uid'''
if uid not in self.usermap and time.time()>self.usermapage+300: # read only every 5 minutes in case of unknown users
self.readusermap()
if uid not in self.usermap:
return uid
else:
return self.usermap[uid]
def getvalue(self, l, key):
''' get values for specified key from list l of form: key value key value'''
try:
p = l.index(key)
except ValueError:
return None
return l[p + 1]
def read_from_last_pos_to_end(self):
'''read from file from current position to current end,
build lists for inserts and updates and do batch execution'''
jobs = {}
jobstarts = {}
jobends = {}
# b = self.f.read()
# ### aps #######
# for l in b.split("\n"):
# code to ignore incomplete lines, ignore garbage and seek back to last end of line
# in case of single incomplete line, it is supposed to work as well
lines = self.f.readlines()
if len(lines)==0: return
if lines[-1][-1] != '\n':
self.f.seek(0-len(lines[-1]),1) # seek back to last end of line
del lines[-1]
### aps #######
for l in lines:
if "Bound apid" in l:
sp = l[:-1].split()
jobid = self.getvalue(sp, "batchId")[1:-1]
resid = self.getvalue(sp, "resId")
# Cray resid is used in logfile to identfy jobs, this "Bound apid" line is the only line containing batchId
# so later on we will map resid to batchId as batchId is used in database, to make it easyer to map
# database data to existing jobs
self.resToJob[resid] = jobid + self.db.batchpostfix # we add batchserver here as cray log files do not contain it!!!
self.resToJob.sync()
#jobs[jobid] = {'jobid': jobid}
if "Placed apid" in l:
sp = l[:-1].split()
# OLD direct logfiel format 2014-01-27 00:01:19:
# sstart = sp[0] + " " + sp[1][:-1]
# start = int( time.mktime( time.strptime(sstart, "%Y-%m-%d %H:%M:%S") ) )
# NEW time format after syslog 2014-06-12T16:01:59.829416+02:00
start = calendar.timegm(dateutil.parser.parse(sp[0]).utctimetuple())
resid = self.getvalue(sp, "resId")
uid = self.getvalue(sp, "uid")
cmd = self.getvalue(sp, "cmd0")[1:-1]
nids = self.getvalue(sp, "nids:")
try:
try:
owner = pwd.getpwuid(int(uid)).pw_name
except KeyError:
# in case /etc/passwd does not contain user, we check file mapping
owner = self.mapuser(uid)
jobid = self.resToJob[resid]
self.db.insert_job(jobid, start, -1, owner, nids, cmd)
print "jobstart:",jobid,"owner:",owner
except KeyError:
print "job without binding", resid
if "Released apid" in l:
sp = l[:-1].split()
# OLD format
# send = sp[0] + " " + sp[1][:-1]
# end = int(time.mktime(time.strptime(send, "%Y-%m-%d %H:%M:%S")))
# NEW format
end = calendar.timegm(dateutil.parser.parse(sp[0]).utctimetuple())
resid = self.getvalue(sp, "resId")
try:
jobid = self.resToJob[resid]
self.db.update_job(jobid, -1, end, "", "", "")
except KeyError:
print "job without binding", resid
# be nice and shrink the DB
try:
del self.resToJob[resid]
except KeyError:
pass # we give a ...
self.db.commit()
def switch_file(self, filename):
todayfile = time.strftime("%Y%m%d")
if filename.startswith(self.prefix) and todayfile in filename:
self.read_from_last_pos_to_end()
self.f.close()
self.filename = self.path + "/" + filename
self.f = open(self.filename, "r")
# print "new file", self.filename
def action(self, e):
if e["mask"] & _inotify.CREATE:
self.switch_file(e["name"])
if e["mask"] & _inotify.MODIFY:
self.read_from_last_pos_to_end()
def mainloop():
fd = _inotify.create()
wddir = _inotify.add(fd, ROOTDIR, _inotify.CREATE | _inotify.MODIFY)
todayfile = FILEPREFIX + time.strftime("%Y%m%d")
# todayfile = "apsched20131221"
lf = Logfile(FILEPREFIX, ROOTDIR, todayfile)
while True:
# blocking wait
_inotify.read_event(fd, lf.action)
time.sleep(0.1)
if __name__ == "__main__":
mainloop()