forked from Lujano/Proyecto-ICIVA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathConsolaCMU.py
221 lines (195 loc) · 9.44 KB
/
ConsolaCMU.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
"""#################################INFORMACION###############################
* Filename : ConsolaCMU.py
* Project : Autonomus
* Board : Demoqe
* Autor : Luis Lujano (13-10775)
* GitHub : https://github.com/Lujano
* Sensors : CMUcam1 (camera)
* ###########################################################################"""
"""
Descripcion:
Este programa permite la comunicacion via serial entre la camara CMUcam1 y la pc mediante el empleo de python (3.6).
Commandos:
-Los commandos se ingresan por consola.
Data:
- Implementada la decodificacion de todos los tipos de paquete enviados por la camara
- Se permite la visualizacion de los paquetes recibidos en formato string y como arreglo de enteros
- Se puede visualizar la imagen tomada por la camara mediante el comando DF, y matplotlib (tiempo aprox 6 seg)
"""
import serial
import time
import numpy as np
import matplotlib.pyplot as plt
import cv2
#import winsound
def open_port():
ser = serial.Serial('/dev/ttyUSB0', 115200) # o "COM12" en windows
return ser
def close_port(port):
port.close()
def comfirm_ACK(port):
ACK = port.read(3)#ACk
ACK = chr(ACK[0])+chr(ACK[1])+chr(ACK[2])
r = chr(ord(port.read(1)))
if ACK == "ACK" and r == "\r": # Si se recibio el ACK del comando
return 1
else:
return 0
def read_buffer(port):
buff = np.array([], dtype="uint8") # Matriz temporal donde se guardara lo almacenado en el buffer
packet_type = ord(port.read(1)) # Convertir a entero el Tipos : C, F(1), M, N y S
if (packet_type == 1): # Si el paquete es tipo F, se hace una lectura con mas tiempo de espera por paquete
wait_time = 0.2 # si es mas grande, el buffer se llena y aparecen rayas de colores en la imagen recibida
time.sleep(wait_time)
else:
wait_time = 0.01
time.sleep(wait_time)
buff = np.append(buff, [packet_type], 0) # Agregar el tipo de paquete al comienzo del buffer
while (port.in_waiting != 0): # Mientras hallan bytes por leer
size_toread = port.in_waiting
data = port.read(size_toread)
for bytes in data:
buff = np.append(buff, [bytes], 0) # Se guardan como entero los datos recibidos
time.sleep(wait_time)
return buff # retornar lecutra del buffer serial
def idle_state(buff): # Esta funcion verifica si la camara se encuentra en el estado idle examinando la data devuelta
# por esta, y retorna el paquete recibido
last_byte = buff[buff.size-1]
packet = buff
if (chr(last_byte) == ":"):
packet = np.delete(buff, buff.size-1, 0) # Eliminar el caracter de estado idle del buffer
return 1, packet
else:
return 0, packet
def packet2string(packet):
s = ""
for data in packet: # convertir buffer(enteros) a string
s = s + chr(data)
return s
def decode(packet): # Decodificador de paquetes segun su tipo
packet_type = chr(packet[0]) # Tipos : C, F, M, N y S
last_byte = packet[packet.size-1]
if packet_type == chr(1) and last_byte == 3: # Paquete tipo F
cols_index, = np.where(packet == 2)
n_cols = cols_index.size
n_rows = int((cols_index[1] - cols_index[0] - 1) / 3) # restar indices de columnas -1, es el numero de filas*3
print("Filas = {}, Columnas = {}".format(n_rows, n_cols))
# Crear imagen
image = np.zeros((n_rows, n_cols, 3), dtype="uint8")
for col in range(n_cols):
i = cols_index[col] - n_rows * 3 # indice del primer elemento (color r) en la trama de la columna
for row in range(n_rows):
r = i + row * 3 # indice del color rojo en cada fila de la trama columna enviada
image[row, col] = [packet[r + 2], packet[r + 1], packet[r]] # bgr (formato opencv)
return image
else:
s =""
for data in packet: # convertir buffer(enteros) a string
s = s+chr(data)
list_data = [int(s) for s in s.split() if s.isdigit()]
data_array = np.array([0], dtype="uint8")
for element in list_data:
data_array = np.append(data_array, [element], 0)
if packet_type == "C" and data_array[data_array.size - 1] == ord("\r"): # Paquete tipo C
x1, y1 = data_array[1], data_array[2]
x2, y2, pixels, confidence = data_array[3], data_array[4], data_array[5], data_array[6]
return x1, y1, x2, y2, pixels, confidence
if packet_type == "M" and last_byte == ord("\r"): # Paquete tipo M
mx, my, x1, y1 = data_array[1], data_array[2], data_array[3], data_array[4]
x2, y2, pixels, confidence = data_array[5], data_array[6], data_array[7], data_array[8]
return mx, my, x1, y1, x2, y2, pixels, confidence
if packet_type == "N" and last_byte == ord("\r"): # Paquete tipo N
spos, mx, my, x1, y1 = data_array[1], data_array[2], data_array[3], data_array[4], data_array[5]
x2, y2, pixels, confidence = data_array[6], data_array[7], data_array[8], data_array[9]
return spos, mx, my, x1, y1, x2, y2, pixels, confidence
if packet_type == "S" and last_byte == ord("\r"): # Paquete tipo M
Rmean, Gmean, Bmean = data_array[1], data_array[2], data_array[3]
Rdev, Gdev, Bdev = data_array[4], data_array[5], data_array[6]
return Rmean, Gmean, Bmean, Rdev, Gdev, Bdev
def force_idle(port): # Visualizar porq no recibe el ack
idle = 0
while idle == 0 :
port.reset_input_buffer()
port.write("\r".encode("utf-8")) # finalizar stream
buff = read_buffer(port) # leer buffer serial de entrada
idle, packet = idle_state(buff)
print("Force idle = {}".format(idle))
return
def main():
port = open_port()
port.reset_input_buffer()
print("Bienvenido\nEscriba el comando a enviar a la camara: (q para salir)")
command = input()
port.write((command + "\r").encode("utf-8"))
while command != "q":
ack = comfirm_ACK(port)
print("ACK = {}".format(ack))
if ack == 1:
tini = time.time()
buff = read_buffer(port) # leer buffer serial de entrada
print("Buffer = {}".format(buff))
print("Tiempo de lectura = {} s".format(time.time() - tini))
idle, packet = idle_state(buff)
print("Idle = {}".format(idle))
print("Paquete = {}".format(packet))
if command == "DF": # DF\r, Dump File
if idle == 1:
image = decode(packet)
image_raw = cv2.flip(image, -1) # Reajuste a la imagen original vista por la camara
plt.figure("CMUcam1")
image = cv2.flip(image, 0)
plt.subplot(1, 2, 1)
plt.title("Imagen cruda")
plt.imshow(image_raw[..., ::-1])
plt.subplot(1, 2, 2)
plt.title("Imagen Flip")
plt.imshow(image[..., ::-1])
plt.show()
elif command == "": # Comando \r
print("HO!")
elif command == "GV":
print("packet = "+packet2string(packet))
elif command == "GM":
port.write("\r".encode("utf-8")) # finalizar stream
print("packet = " + packet2string(packet))
if idle == 0:
Rmean, Gmean, Bmean, Rdev, Gdev, Bdev = decode(packet)
print("Rmean={}, Gmean={}, Bmean={}, Rdev={}, Gdev={}, Bdev={}".format(Rmean, Gmean, Bmean, Rdev, Gdev, Bdev))
force_idle(port)
elif command[:2] == "TC":
print("packet = " + packet2string(packet))
if idle == 0:
mx, my, x1, y1, x2, y2, pixels, confidence = decode(packet)
print("mx={}, my={}, x1={}, y1={}, x2={}, y2={}".format(mx, my, x1, y1, x2, y2))
print("pixels = {}, confidende = {}".format(pixels, confidence))
force_idle(port)
port.write(("DF" + "\r").encode("utf-8"))
ack = comfirm_ACK(port)
buff = read_buffer(port) # leer buffer serial de entrada
idle, packet = idle_state(buff)
if idle == 1:
image = decode(packet)
image_raw = cv2.flip(image, -1) # Reajuste a la imagen original vista por la camara
cv2.rectangle(image_raw, (x1, y1), (x2, y2), (255, 0, 0), 1)
cv2.circle ( image_raw, (mx, my), 3, (255, 0, 0), -1)
plt.figure("CMUcam1")
image = cv2.flip(image, 0)
plt.subplot(1, 2, 1)
plt.title("Imagen cruda")
plt.imshow(image_raw[..., ::-1])
plt.subplot(1, 2, 2)
plt.title("Imagen Flip")
plt.imshow(image[..., ::-1])
plt.show()
else:
print("packet = " + packet2string(packet))
print("Other command")
else:
print("Error in ack")
port.reset_input_buffer()
print("\nEscriba otro comando a enviar a la camara: (q para salir)")
command = input()
port.write((command + "\r").encode("utf-8"))
print("Finished")
close_port(port)
if __name__ == "__main__": main()