-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmess_card.py
111 lines (84 loc) · 3.85 KB
/
mess_card.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import random
import datetime
import uuid
# OBJECTIVES TODO:
# 1) Read the code and understand it.
# 2) Read the code again and understand it better.
# 3) Feel free to do 1 and 2 however many times you feel like.
# 4) Complete the SyncService implementation. Note that the SyncService.onMessage and SyncService.__init__ function signature must not be altered.
_DATA_KEYS = ["a","b","c"]
class Device:
def __init__(self, id):
self._id = id
self.records = []
self.sent = []
def obtainData(self) -> dict:
"""Returns a single new datapoint from the device.
Identified by type `record`. `timestamp` records when the record was sent and `dev_id` is the device id.
`data` is the data collected by the device."""
if random.random() < 0.4:
# Sometimes there's no new data
return {}
rec = {
'type': 'record', 'timestamp': datetime.datetime.now().isoformat(), 'dev_id': self._id,
'data': {kee: str(uuid.uuid4()) for kee in _DATA_KEYS}
};self.sent.append(rec)
return rec
def probe(self) -> dict:
"""Returns a probe request to be sent to the SyncService.
Identified by type `probe`. `from` is the index number from which the device is asking for the data."""
if random.random() < 0.5:
# Sometimes the device forgets to probe the SyncService
return {}
return {'type': 'probe', 'dev_id': self._id, 'from': len(self.records)}
def onMessage(self, data: dict):
"""Receives updates from the server"""
if random.random() < 0.6:
# Sometimes devices make mistakes. Let's hope the SyncService handles such failures.
return
if data['type'] == 'update':
_from = data['from']
if _from > len(self.records):
return
self.records = self.records[:_from] + data['data']
class SyncService:
def __init__(self):
self.aggregated_updates = []
def onMessage(self, data: dict):
"""Handle messages received from devices.
Return the desired information in the correct format (type `update`, see Device.onMessage and testSyncing to understand format intricacies) in response to a `probe`.
No return value required on handling a `record`."""
if data['type'] == 'probe':
dev_id = data['dev_id']
from_index = data['from']
# Respond with updates starting from the specified index
updates_to_send = self.aggregated_updates[from_index:]
return {'type': 'update', 'from': from_index, 'data': updates_to_send}
elif data['type'] == 'record':
# Record received, simply store it in the aggregated updates
self.aggregated_updates.append(data['data'])
def testSyncing():
devices = [Device(f"dev_{i}") for i in range(10)]
syn = SyncService()
_N = int(1e6)
for i in range(_N):
for _dev in devices:
syn.onMessage(_dev.obtainData())
_dev.onMessage(syn.onMessage(_dev.probe()))
done = False
while not done:
for _dev in devices: _dev.onMessage(syn.onMessage(_dev.probe()))
num_recs = len(devices[0].records)
done = all([len(_dev.records) == num_recs for _dev in devices])
ver_start = [0] * len(devices)
for i,rec in enumerate(devices[0].records):
_dev_idx = int(rec['dev_id'].split("_")[-1])
assertEquivalent(rec, devices[_dev_idx].sent[ver_start[_dev_idx]])
for _dev in devices[1:]:
assertEquivalent(rec, _dev.records[i])
ver_start[_dev_idx] += 1
def assertEquivalent(d1:dict, d2:dict):
assert d1['dev_id'] == d2['dev_id']
assert d1['timestamp'] == d2['timestamp']
for kee in _DATA_KEYS:
assert d1['data'][kee] == d2['data'][kee]