This repository has been archived by the owner on Nov 23, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.py
96 lines (92 loc) · 2.29 KB
/
cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import json
import threading
import time
from clusterBase import ClusterBase
import copy
host = "172.25.2.1"
port = 8087
#host = ""
#port = 1122
countInterval = 2
buffsize = 2048
diff0 = 0.05
class Queue:
def __init__(self):
self.data = []
def put(self, x):
self.data.append(x)
def get(self):
item = self.data[0]
self.data = self.data[1:]
return item
def empty(self):
return len(self.data) == 0
class Cluster(ClusterBase):
def __init__(self, host = host, port = port, buffsize = buffsize):
super(Cluster, self).__init__(host, port, buffsize)
self.time = ""
self.interval = countInterval
self.nextInfo = {}
self.data = {"nodes":{}}
self.mutex = threading.Lock()
self.mutex.acquire(0)
self.deleteLock = threading.Lock()
self.infoLock = threading.Lock()
self.listLock = threading.Lock()
self.msgList = Queue()
def updateData(self, time):
self.data["time"] = time
if self.listLock.acquire():
self.data["info"] = {}
if self.infoLock.acquire():
for x in self.nextInfo:
self.data["info"][x] = self.nextInfo[x]["msg"]
self.infoLock.release()
self.msgList.put(copy.copy(self.data))
self.listLock.release()
self.time = time
self.data = {"nodes":{}}
#self.data["time"] = time
if not self.mutex.acquire(0):
self.mutex.release()
self.interval = countInterval
def onMessage(self, msg, node):
try:
msg = json.loads(msg)
except: # node died
return
#print(msg)
if "sys" in msg:
if self.infoLock.acquire():
self.nextInfo[msg["node"]] = {
"msg": msg,
"socket": node,
}
self.infoLock.release()
else:
time = msg["time"]
#print("received status at", time)
if self.time and time != self.time: # new second
self.interval -= 1
if self.interval == 0:
self.updateData(time)
self.time = time
self.data["nodes"][msg["node"]] = msg
#self.data["nodes"][msg["node"]] = [
# 100 - float(msg["CPU"]["%idle"]),
# float(msg["memory"]["%memused"])
#]
def onDisconnect(self, node):
if self.infoLock.acquire():
for x in self.nextInfo:
if self.nextInfo[x]["socket"] == node:
print("node", x, "closed connection")
del self.nextInfo[x]
self.deleteLock.acquire(0)
break
self.infoLock.release()
if __name__ == "__main__":
c = Cluster()
while True:
s = input(">")
c.broadcast(s)