forked from a-yun/distributed-snapshot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnode.py
142 lines (115 loc) · 4.43 KB
/
node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
from collections import defaultdict
import pickle
import random
from utils import pipeName, Pipes
class Node:
def __init__(self, node_id, balance):
self.node_ids = []
# Internal state
self.id = node_id
self.balance = balance
# Snapshot state
self.nodeState = 0
self.channelState = defaultdict(int)
# Snapshot progress
self.receivedToken = False
self.stopRecording = defaultdict(bool)
self.pipes = Pipes()
self.pipes.createPipe(self.id, 'master', write=True, blocking=False)
self.pipes.createPipe('master', self.id, write=False, blocking=True)
self.pipes.createPipe(self.id, 'observer', write=True, blocking=False)
self.pipes.createPipe('observer', self.id, write=False, blocking=False)
def listen(self):
while(True):
message = self.pipes.receiveMessage('master', self.id)
# rid of newline
message = message.strip().split(' ')
command = message[0]
if command == "Send":
_, receiver, val = message
self.send(receiver, int(val))
elif command == "Receive":
# only receiver is specified
if len(message) == 1:
self.receive()
else:
sender = message[1]
self.receive(sender)
elif command == "ReceiveAll":
received = self.receiveAll()
self.pipes.sendMessage(self.id, 'master', f'ack {received}')
continue
elif command == "CreateNode":
self.node_ids.append(message[1])
self.pipes.createPipe(self.id, message[1], write=True, blocking=False)
self.pipes.createPipe(message[1], self.id, write=False, blocking=False)
self.pipes.sendMessage(self.id, 'master', 'ack')
def startSnapshot(self, sender):
self.receivedToken = True
# collect balance state
self.nodeState = self.balance
# record empty on channel
if sender != 'observer':
self.channelState[sender] = 0
self.stopRecording[sender] = True
# send snapshot to neighbors
for node_id in self.node_ids:
self.pipes.sendMessage(self.id, node_id, 'snapshot')
def collect(self):
# send state to obs
self.pipes.sendMessage(self.id, 'observer', (self.nodeState, self.channelState))
# reset snapshot state and progress
self.nodeState = 0
self.channelState = defaultdict(int)
self.receivedToken = False
self.stopRecording = defaultdict(bool)
def send(self, receiver, val):
# error check
if val > self.balance:
print("ERR_SEND")
return
else:
# send the money
# append sent val to channel
self.pipes.sendMessage(self.id, receiver, str(val))
self.balance = self.balance - val
def receiveAll(self):
received = False
while self.receive(has_output=False):
received = True
return received
def receive(self, sender=-1, has_output=True):
output = None
if sender == 'observer':
has_output = False
# random sender
if sender == -1:
message = None
ids = random.sample(self.node_ids, len(self.node_ids))
while message is None:
if len(ids) == 0:
return False
sender = ids.pop()
message = self.pipes.receiveMessage(sender, self.id)
else:
message = self.pipes.receiveMessage(sender, self.id)
# rid of newline
message = message.strip()
# start computation
if message == "snapshot":
output = f'{sender} SnapshotToken -1'
if self.receivedToken:
self.stopRecording[sender] = True
else:
self.startSnapshot(sender)
elif message == 'collect':
self.collect()
else:
output = f'{sender} Transfer {message}'
self.balance = self.balance + int(message)
# collect channel states
if self.receivedToken and not self.stopRecording[sender]:
self.channelState[sender] += int(message)
if output and has_output:
print(output)
return True