-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserialDiffCat.py
executable file
·131 lines (114 loc) · 3.5 KB
/
serialDiffCat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/python2.7
import argparse, os, sys
import threading
import time
import serial
import math
parser = argparse.ArgumentParser()
parser.add_argument("-i", "--inputFile", type=str, required=True, help="input file")
parser.add_argument("-o", "--outputFile", type=str, required=True, help="output file")
parser.add_argument("-d", "--device", type=str, required=True, help="tty Device")
parser.add_argument("-B", "--baudrate", type=int, required=False, help="")
parser.add_argument("-P", "--parity", type=str, required=False, help="")
parser.add_argument("-t", "--transmitSize", type=int, required=False, help="Size of transmited frame")
parser.add_argument("-g", "--graph", action='store_true', required=False, help="Transfer graph plot")
args = parser.parse_args()
#Assert
if (not args.inputFile):
print "No input"
sys.exit(1)
#Assert
if (not args.outputFile):
print "No output"
sys.exit(1)
#Assert
if (not args.device):
print "No device"
sys.exit(1)
#Config check
if (not args.baudrate):
defaultBaudrate=115200
else:
defaultBaudrate=args.baudrate
#Config check
if (not args.parity):
defaultParity=serial.PARITY_EVEN
else:
if args.parity == "even":
defaultParity=serial.PARITY_EVEN
elif args.parity == "odd":
defaultParity=serial.PARITY_ODD
else:
defaultParity=serial.PARITY_NONE
#Config check
if (not args.transmitSize):
defaultTransmitSize=1024
else:
defaultTransmitSize=args.transmitSize
#Config check
if (args.graph):
import matplotlib.pyplot as plt
plot_time = []
plot_RxData = []
plot_TxData = []
semaphoreStartSynchro = threading.Semaphore()
# Read input file size
inputSize = os.stat(args.inputFile).st_size
# Serial port read
portHandle = serial.Serial(
port=args.device,
baudrate=defaultBaudrate,
rtscts=True,
parity=defaultParity,
stopbits=serial.STOPBITS_ONE,
bytesize=serial.EIGHTBITS,
timeout=3,
write_timeout=10
)
if portHandle.isOpen():
portHandle.close()
portHandle.open()
print "Port ",args.device," opened."
# Variable with state of Rx Thread
TxTransmitted=0
# Writing thread
def main():
global RxThreadRunning
global TxTransmitted
isError=0
# Open write file and send lines
print "Input from ",args.inputFile,"."
print "Input size : ",inputSize,"Bytes."
print "Chunk size : ",defaultTransmitSize,"."
inFile = open(args.inputFile,'r')
startTime=time.time()
# Inifinite loop through file
while (isError == 0) :
outFile = open(args.outputFile,'w')
for chunk in iter(lambda: inFile.read(defaultTransmitSize), ''):
writeSize = portHandle.write(chunk)
readedChunk = portHandle.read(defaultTransmitSize);
# Check data was read
if len(readedChunk) > 0:
outFile.write(readedChunk)
else:
isError = 1
print "Timeout error!"
break;
# Compare data
if (chunk != readedChunk):
isError = 1
print "Verify error!"
break;
TxTransmitted += writeSize
durationTime=time.time()-startTime
# Print trace info
sys.stdout.write("\rTransmitted %dB. Transfer speed %02.02f kB/s." % (TxTransmitted, round((TxTransmitted/durationTime)/1024,2)))
sys.stdout.flush()
inFile.seek(0)
outFile.close()
inFile.close()
print "Port ",args.device," closed."
portHandle.close()
# Call main
main()