-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathreader_async.py
155 lines (128 loc) · 4.4 KB
/
reader_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""Util for reading from HAN port."""
from __future__ import annotations
import argparse
import datetime
import json
import logging
import signal
import sys
from asyncio import Queue, create_task, get_event_loop, run
from typing import Any
from han import autodecoder
from han.meter_connection import (
AsyncConnectionFactory,
ConnectionManager,
MeterTransportProtocol,
)
from han.serial_connection_factory import create_serial_message_payload_connection
from han.tcp_connection_factory import create_tcp_message_payload_connection
logging.basicConfig(
level=logging.DEBUG,
format="%(levelname)7s: %(message)s",
stream=sys.stderr,
)
LOG = logging.getLogger("")
def _get_arg_parser() -> argparse.ArgumentParser:
def valid_host_port(host_port: str) -> tuple[str, str]:
host_and_port = host_port.split(":")
if len(host_and_port) == 2:
return host_and_port[0], host_and_port[1]
else:
msg = f"Not a valid host and port: '{host_port}'."
raise argparse.ArgumentTypeError(msg)
parser = argparse.ArgumentParser("read HAN port")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"-host",
dest="hostandport",
type=valid_host_port,
help="input host and port separated by :",
)
group.add_argument("-serial", dest="serialdevice", help="input serial port")
parser.add_argument(
"-sp",
dest="ser_parity",
default="N",
required=False,
choices=["N", "O", "E"],
help="input serial port parity",
)
parser.add_argument(
"-sb",
dest="ser_baudrate",
default=2400,
type=int,
required=False,
help="input serial port baud rate",
)
parser.add_argument("-mh", dest="mqtthost", default="localhost", help="mqtt host")
parser.add_argument(
"-mp", dest="mqttport", type=int, default=1883, help="mqtt port port"
)
parser.add_argument(
"-t", dest="mqtttopic", default="han", help="mqtt publish topic"
)
parser.add_argument(
"-dumpfile", dest="dumpfile", help="dump received bytes to file"
)
parser.add_argument(
"-r",
dest="reconnect",
type=bool,
default=True,
help="automatic retry/reconnect meter connection",
)
parser.add_argument("-v", dest="verbose", default=False)
return parser
def _json_converter(source: Any) -> str | None:
if isinstance(source, datetime.datetime):
return source.isoformat()
return None
_decoder = autodecoder.AutoDecoder()
def _measure_received(frame: bytes) -> None:
decoded_frame = _decoder.decode_message_payload(frame)
if decoded_frame:
json_frame = json.dumps(decoded_frame, default=_json_converter)
LOG.debug("Decoded frame: %s", json_frame)
else:
LOG.error("Could not decode frame content: %s", frame.hex())
async def _process_frames(queue: "Queue[bytes]") -> None:
while True:
frame = await queue.get()
_measure_received(frame)
async def main() -> None:
"""Start reading."""
args = _get_arg_parser().parse_args()
loop = get_event_loop()
queue: Queue[bytes] = Queue()
create_task(_process_frames(queue))
async def tcp_connection_factory() -> MeterTransportProtocol:
host, port = args.hostandport
return await create_tcp_message_payload_connection(
queue, loop, None, host, port
)
async def serial_connection_factory() -> MeterTransportProtocol:
return await create_serial_message_payload_connection(
queue,
None,
None,
url=args.serialdevice,
baudrate=args.ser_baudrate,
parity=args.ser_parity,
)
connection_factory: AsyncConnectionFactory = (
serial_connection_factory if args.serialdevice else tcp_connection_factory
)
if args.reconnect:
# use high-level ConnectionManager
connection_manager = ConnectionManager(connection_factory)
loop.add_signal_handler(signal.SIGINT, connection_manager.close)
await connection_manager.connect_loop()
else:
# use low-level transport and protocol
transport, protocol = await connection_factory()
loop.add_signal_handler(signal.SIGINT, transport.close)
await protocol.done
LOG.info("Done...")
if __name__ == "__main__":
run(main())