-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathheli.py
executable file
·120 lines (88 loc) · 3.42 KB
/
heli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
import serial, sys, time, os
import logging
from threading import Thread, Condition
logging.basicConfig(format='%(asctime)s %(levelname)-6s %(name)s %(message)s', level=logging.DEBUG)
class Heli(object):
def __init__(self,):
self.__connection_up = False
self.__available = Condition()
self.initialize()
path = next(
(
x
for x in os.listdir('/dev')
if x.startswith('ttyACM')
),
None,
)
if not path:
raise ConnectionError("Could find and connect to the helicopter controller")
path = '/dev/'+path
self.__serial_connection = serial.Serial(path, 9600)
self.__serial_connection.flushInput() # cleans up anything lingering
ser = self.__serial_connection
my_heli = self
def f():
while my_heli.__connection_up is False:
try:
ser_bytes = ser.readline()
read_str = ser_bytes.decode('utf-8').strip()
if read_str == "READY":
with my_heli.__available:
my_heli.__connection_up = True
my_heli.__available.notify()
except:
Heli.__LOGGER.warn("Error while working with the serial connection", exc_info=1)
with my_heli.__available:
my_heli.__connection_up = None
my_heli.__available.notify()
t = Thread(target=f, daemon=True,)
t.start()
def is_ready(self):
"""
Returns C{True} if this helicopter is ready for flight.
@rtype: bool
"""
with self.__available:
if self.__connection_up is False:
self.__available.wait()
if self.__connection_up is None:
Heli.__LOGGER.error("Connection failed.")
return False
assert self.__connection_up is True
Heli.__LOGGER.info("Ready to go!")
return True
def send(self):
"""
Sends the configured desired state of the helicopter to the remote controller.
Waits for the helicopter to be ready for flight L{automatically<Heli.is_ready>}.
"""
if not self.is_ready():
Heli.__LOGGER.error("Cannot send desired state: no connection available.")
return
msg = [self.yaw, self.pitch, self.throttle, self.trim]
written_bytes = self.__serial_connection.write(msg)
assert written_bytes == len(msg)
self.__serial_connection.flush()
Heli.__LOGGER.debug("Sent %d %d %d %d", *msg)
# this is to wait for the arduino to get it, we could implement an ACK
time.sleep(0.5)
def land(self):
"""
Resets the state of the helicopter to the L{initial<Heli.initial>} state (i.e. crash lands).
"""
self.initialize()
self.send()
def initialize(self,):
"""
Configures the desired state of the helicopter to neutral and no gaz.
"""
self.yaw, self.pitch, self.throttle, self.trim = 63, 63, 0, 63
__LOGGER = logging.getLogger(__name__)
if __name__ == '__main__':
heli = Heli()
heli.throttle = 30
heli.send()
time.sleep(5) # 5s in the air
heli.land()