-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlustre_data_aggregator.py
executable file
·147 lines (122 loc) · 4.04 KB
/
lustre_data_aggregator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python
# simple data gathering client for lustre performance data
# multithreaded
# tested with lustre 1.8 and python 2.4
# Holger Berger 2014
import sys
sys.path.append("MySQL")
import xmlrpclib, time, socket
import sys, signal, os
from threading import Thread, Lock
from data_inserter import Logfile
SLEEP = 60 # > 10 sec
TIMEOUT = 30 # has to be < SLEEP
FILEVERSION = "1.0"
servers = sys.argv[1:]
out = open("data_file", "w")
db = Logfile()
rpcs = {}
types = {}
nids = {}
oldnids = {}
hostnames = {}
threads = {}
iolock = Lock()
first = True
timings = {}
bws = {}
reqs = {}
def signalhandler(signum, frame):
global out, first, iolock
if signum == signal.SIGUSR1:
iolock.acquire()
out.close()
print "switching file to", "data_file_" + str(int(time.time()))
os.rename("data_file", "data_file_" + str(int(time.time())))
out = open("data_file", "w")
iolock.release()
first = True
def worker(srv):
global oldnids, first, timings, bws, reqs
t1 = time.time()
try:
r = rpcs[srv].get_sample()
except:
print >>sys.stderr, "failed to connect to server:", srv
timings[srv] = time.time() - t1
return
timings[srv] = time.time() - t1
if len(r) == 0:
return
nids[srv] = r[0].split(";")
if first or nids[srv] != oldnids[srv]:
iolock.acquire()
# --------- switch to db here ---------
out.write("#" + FILEVERSION + ";" + hostnames[srv] + ";")
out.write(nids[srv][0] + ";")
out.write(";".join(nids[srv][1:]) + "\n")
iolock.release()
oldnids[srv] = nids[srv]
for ost in r[1:]:
l = []
sp = ost.split(";")
for i in sp:
if type(i) == list:
l.append(",".join(map(str, i)))
else:
l.append(i)
iolock.acquire()
# --------- switch to db here ---------
out.write(hostnames[srv] + ";" + str(int(sample)) + ";" + ";" .join(map(str, l))+"\n")
iolock.release()
vs = sp[1].split(',')
if len(vs) == 1:
reqs[srv] = reqs.setdefault(srv, 0) + int(sp[1])
else:
(wb, rb) = bws.setdefault(srv, (0, 0))
bws[srv] = (wb + int(vs[1]), rb + int(vs[3]))
#------------------------------------------------------------------------------
socket.setdefaulttimeout(TIMEOUT)
for srv in servers:
rpcs[srv] = xmlrpclib.ServerProxy('http://' + srv + ':8000')
types[srv] = rpcs[srv].get_type()
hostnames[srv] = rpcs[srv].get_hostname()
print "connected to %s running a %s" % (hostnames[srv], types[srv])
signal.signal(signal.SIGUSR1, signalhandler)
while True:
sample = time.time()
for srv in servers:
threads[srv] = Thread(target=worker, args=(srv,))
threads[srv].start()
for srv in servers:
threads[srv].join()
e = time.time()
# set this here, to have a good chance to get it right during sleep
first = False
print "%3.3fs for sample collection," % (e - sample),
minT = ("", 10000)
maxT = ("", 0)
avg = 0
for (s, v) in timings.iteritems():
if v < minT[1]:
minT = (s, v)
if v > maxT[1]:
maxT = (s, v)
avg += float(v)
print "transfer times - max: %s %3.3fs" % maxT, "- min: %s %3.3fs" % minT, "- avg: %3.3fs" % (avg/len(timings))
for mdt in reqs:
print " metadata requests %s: %6.1f/s" % (mdt, reqs[mdt] / float(SLEEP))
reqs[mdt] = 0
trbs = 0
twbs = 0
for oss in bws:
print " oss data bandwidth %s: read %7.1f MB/s - write %7.1f MB/s" % (
oss, bws[oss][0] / (1024.0 * 1024.0 * float(SLEEP)),
bws[oss][1] / (1024.0 * 1024.0 * float(SLEEP)))
trbs += bws[oss][0]
twbs += bws[oss][1]
bws[oss] = (0, 0)
print " === total bandwidth === : read %7.1f MB/s - write %7.1f MB/s" % (
trbs / (1024.0 * 1024.0 * float(SLEEP)),
twbs / (1024.0 * 1024.0 * float(SLEEP)))
time.sleep(SLEEP - ((e - sample) % SLEEP))