-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathteslabox.py
109 lines (105 loc) · 3.91 KB
/
teslabox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/python3
# "Autobox" dongle driver for HTML 'streaming' - test application
# Created by Colin Munro, December 2019
# See README.md for more information
"""Implementation to stream PNGs over a webpage that responds with touches that are relayed back to the dongle for Tesla experimental purposes."""
import decoder
import server
import link
import protocol
from threading import Thread
import time
class Teslabox:
class _Server(server.Server):
def __init__(self, owner):
self._owner = owner
super().__init__()
def on_touch(self, type, x, y):
if self._owner.connection is None:
return
if True:
msg = protocol.Touch()
types = {"down": protocol.Touch.Action.Down, "up": protocol.Touch.Action.Up, "move": protocol.Touch.Action.Move}
msg.action = types[type]
msg.x = int(x*10000/800)
msg.y = int(y*10000/600)
else:
types = {"down": protocol.MultiTouch.Touch.Action.Down, "up": protocol.MultiTouch.Touch.Action.Up, "move": protocol.MultiTouch.Touch.Action.Move}
msg = protocol.MultiTouch()
tch = protocol.MultiTouch.Touch()
tch.x = int(x)
tch.y = int(y)
tch.action = types[type]
msg.touches.append(tch)
self._owner.connection.send_message(msg)
def on_get_snapshot(self):
return self._owner._frame
class _Decoder(decoder.Decoder):
def __init__(self, owner):
super().__init__()
self._owner = owner
def on_frame(self, png):
self._owner._frame = png
class _Connection(link.Connection):
def __init__(self, owner):
super().__init__()
self._owner = owner
def on_message(self, message):
if isinstance(message, protocol.Open):
if not self._owner.started:
self._owner._connected()
self.send_multiple(protocol.opened_info)
elif isinstance(message, protocol.VideoData):
self._owner.decoder.send(message.data)
def on_error(self, error):
self._owner._disconnect()
def __init__(self):
self._disconnect()
self.server = self._Server(self)
self.decoder = self._Decoder(self)
self.heartbeat = Thread(target=self._heartbeat_thread)
self.heartbeat.start()
def _connected(self):
print("Connected!")
self.started = True
self.decoder.stop()
self.decoder = self._Decoder(self)
def _disconnect(self):
if hasattr(self, "connection"):
if self.connection is None:
return
print("Lost USB device")
self._frame = b''
self.connection = None
self.started = False
def _heartbeat_thread(self):
while True:
try:
self.connection.send_message(protocol.Heartbeat())
except link.Error:
self._disconnect()
except:
pass
time.sleep(protocol.Heartbeat.lifecycle)
def run(self):
while True:
# First task: look for USB device
while self.connection is None:
try:
self.connection = self._Connection(self)
except Exception as e:
pass
print("Found USB device...")
# Second task: transmit startup info
try:
while not self.started:
self.connection.send_multiple(protocol.startup_info)
time.sleep(1)
except:
self._disconnect()
print("Connection started!")
# Third task: idle while connected
while self.started:
time.sleep(1)
if __name__ == "__main__":
Teslabox().run()