-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathmonitor.py
229 lines (198 loc) · 8.35 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
#!/usr/bin/env python
import sys
try:
import serial
except:
print "Please install PySerial: pip install PySerial"
sys.exit(-1)
import serial.tools.list_ports
import struct
import time
from signal import signal, SIGINT
def handler(signal_received, frame):
print 'SIGINT or CTRL-C detected. Will try to exit.'
sys.exit(-1)
if sys.platform.startswith('win'):
import msvcrt
def getch():
if msvcrt.kbhit():
return msvcrt.getch()
else:
# See: https://stackoverflow.com/questions/13207678/whats-the-simplest-way-of-detecting-keyboard-input-in-python-from-the-terminal
import termios, fcntl, os, termios
def getch():
fd = sys.stdin.fileno()
oldterm = termios.tcgetattr(fd)
newattr = termios.tcgetattr(fd)
newattr[3] = newattr[3] & ~termios.ICANON & ~termios.ECHO
termios.tcsetattr(fd, termios.TCSANOW, newattr)
oldflags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
c = None
try:
c = sys.stdin.read(1)
except IOError: pass
termios.tcsetattr(fd, termios.TCSAFLUSH, oldterm)
fcntl.fcntl(fd, fcntl.F_SETFL, oldflags)
return c
def handlePacket(cmdID, packet):
if cmdID == 0xFF:
print str(packet),
elif cmdID == 0x01: # For when RX is SilverLite SPI
if len(packet) >= 12:
loopTime, pps, pktHit, hwCRC, bCRC, osdTime = struct.unpack("<hhhhhh", packet)
print "looptime: ", loopTime
print "osdTime: ", osdTime
print "pps: ", pps, "hit: ", pktHit
print "hcrc: ", hwCRC, "bcrc: ", bCRC
print
elif len(packet) >= 10:
loopTime, pps, pktHit, hwCRC, bCRC = struct.unpack("<hhhhh", packet)
print "looptime: ", loopTime
print "pps: ", pps, "hit: ", pktHit
print "hcrc: ", hwCRC, "bcrc: ", bCRC
print
elif len(packet) >= 2:
print struct.unpack("<h", packet)
elif cmdID == 0x02: # For when RX is IBUS
if len(packet) >= 10:
loopTime, osdTime, pps, pktHit, bCRC = struct.unpack("<hhhhh", packet)
print "looptime: ", loopTime
print "osdTime: ", osdTime
print "pps: ", pps, "hit: ", pktHit
print "bcrc: ", bCRC
print
elif cmdID == 0x03: # Debug logging of USART reception, 32bytes at a time
if len(packet) == 12:
channels = struct.unpack("<" + ('h'*6), packet)
for chan in channels:
print chan,
print "\n"
elif len(packet) == 16: # 4 floats rx[] (roll, pitch, yaw, throttle)
channels = struct.unpack("<" + ('f'*4), packet)
for chan in channels:
print chan,
print "\n"
elif len(packet) == 28:
channels = struct.unpack("<" + ('h'*14), packet)
for chan in channels:
print chan,
print "\n"
else:
for b in packet:
print hex(b),
print "\n"
elif cmdID == 0x04: # Debug logging of RPM_TELEMETRY_DEBUG data
if len(packet) == 8:
debugData = struct.unpack("<" + ('h'*4), packet)
for data in debugData:
print data,
print "\n"
elif cmdID == 0x05: # Debug logging for Test apps
if len(packet) >= 2:
numShorts = len(packet) / 2
debugData = struct.unpack("<" + ('h'*numShorts), packet)
for data in debugData:
print data,
print "\n"
elif cmdID == 0x06: # Debug logging of 32bit signed integers
if len(packet) >= 4:
numInts = len(packet) / 4
debugData = struct.unpack("<" + ('i'*numInts), packet)
for data in debugData:
print data,
print "\n"
else:
print "Unknown packet type: ", hex(cmdID), len(packet)
pass
def checkForInput(ser):
# Check for keyboard input
ch = getch()
if ch in ('R', 'r'):
if ser:
print 'Sending reset command'
ser.write(ch)
elif ch == 'b':
if ser:
print 'Sending bind command'
ser.write(ch)
def main():
signal(SIGINT, handler)
while True:
# Call getch() so that Ctrl-C can be used to kill this script
# should we be in a tight loop trying to open a port that isn't available
getch()
try:
port = None
# Search available com ports for the VCP
# Note: Look at arch/stm32f4-usb.h to see how VID and PID are defined
for p in serial.tools.list_ports.comports():
if p.vid == 0x483 and p.pid == 0x5740:
port = p.device
break
if port:
ser = serial.Serial(port, baudrate=115200, timeout=0)
print "\n\nFlight controller found on", port, "\n"
else:
# VCP not found, do nothing for a tiny bit before retrying
time.sleep(0.1)
continue
except:
time.sleep(0.1)
continue
inputStream = bytearray()
while True:
try:
data = ser.read(1024)
except:
break
checkForInput(ser)
if data:
inputStream += bytearray(data)
while len(inputStream):
# Scan until flag found, then discard everything preceeding it
for i in xrange(len(inputStream)):
if inputStream[i] == 0x7E:
if i > 0:
# Discard everything preceeding the start flag
inputStream = inputStream[i:]
break
# if flag not available
if inputStream[0] != 0x7E:
break
# if enough data to retrieve the payload length
if len(inputStream) >= 3:
payloadLen = inputStream[2]
# if enough data to retrieve checksum and end flag
if len(inputStream) >= (payloadLen + 6): # 3 for packet header, 3 for packet trailer
if inputStream[5 + payloadLen] != 0x7E:
# End flag not present, not a valid frame
# Move past the byte we thought was a start flag but actually wasn't and try again
print "End flag not found"
inputStream = inputStream[1:]
continue
else:
# test checksum
checksum = inputStream[payloadLen+3] + (inputStream[payloadLen+4] << 8)
sum = 0
for i in xrange(payloadLen+3):
sum += inputStream[i]
if checksum != sum:
# Checksum didn't match
# Move past the byte we thought was a start flag but actually wasn't and try again
inputStream = inputStream[1:]
print "Checksum invalid"
continue
# Packet was good, now we should handle it
handlePacket(inputStream[1], inputStream[3:payloadLen+3])
# Advance past the packet and continue in case there's more in the input stream
inputStream = inputStream[payloadLen+6:]
continue
else:
# Need more data, break outta here
break
else:
# Need more data, break outta here
break
#
main()