From 1ee6b9f1ea7e1a810be9beaf9fbef0de1429f693 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Klemens=20B=C3=B6swirth?= Date: Mon, 6 Jan 2025 17:00:55 +0000 Subject: [PATCH] chore: updates for 2025.1 Fixes #176 --- .devcontainer/devcontainer.json | 2 +- .../idm_heatpump/idm_heatpump.py | 25 ++-- .../idm_heatpump/sensor_addresses.py | 107 +++++++++++------- requirements.txt | 2 +- 4 files changed, 77 insertions(+), 59 deletions(-) diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index d94ab5f..a4f9134 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,6 +1,6 @@ { "name": "IDM heat pump integration development", - "image": "mcr.microsoft.com/devcontainers/python:3.12-bullseye", + "image": "mcr.microsoft.com/devcontainers/python:3.13-bullseye", "postCreateCommand": "scripts/setup", "forwardPorts": [ 8123 diff --git a/custom_components/idm_heatpump/idm_heatpump.py b/custom_components/idm_heatpump/idm_heatpump.py index 889814e..34f9f50 100644 --- a/custom_components/idm_heatpump/idm_heatpump.py +++ b/custom_components/idm_heatpump/idm_heatpump.py @@ -8,8 +8,8 @@ from pymodbus.client import AsyncModbusTcpClient from pymodbus.constants import Endian from pymodbus.exceptions import ConnectionException, ModbusException -from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder -from pymodbus.register_read_message import ReadInputRegistersResponse +from pymodbus.payload import BinaryPayloadBuilder +from pymodbus.pdu.register_message import ReadInputRegistersResponse from .const import NAME_POWER_USAGE from .logger import LOGGER @@ -171,13 +171,7 @@ def decode_single( result: ReadInputRegistersResponse, ): try: - available, value = sensor.decode( - BinaryPayloadDecoder.fromRegisters( - result.registers, - byteorder=Endian.BIG, - wordorder=Endian.LITTLE, - ) - ) + available, value = sensor.decode(result.registers) if available: data[sensor.name] = value except ValueError as single_error: @@ -190,21 +184,20 @@ def decode_single( data[sensor.name] = None try: - decoder = BinaryPayloadDecoder.fromRegisters( - result.registers, - byteorder=Endian.BIG, - wordorder=Endian.LITTLE, - ) - LOGGER.debug("got decoder %d", group.start) if len(group.sensors) == 1: # single sensor -> don't do refetch on error decode_single(group.sensors[0], result) else: + register_ptr = 0 for sensor in group.sensors: try: - available, value = sensor.decode(decoder) + registers = result.registers[ + register_ptr : register_ptr + sensor.size + ] + register_ptr += sensor.size + available, value = sensor.decode(registers) if available: data[sensor.name] = value except ValueError as error: diff --git a/custom_components/idm_heatpump/sensor_addresses.py b/custom_components/idm_heatpump/sensor_addresses.py index 71f9fb6..f7be785 100644 --- a/custom_components/idm_heatpump/sensor_addresses.py +++ b/custom_components/idm_heatpump/sensor_addresses.py @@ -22,7 +22,7 @@ UnitOfPower, UnitOfTemperature, ) -from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder +from pymodbus.client.mixin import ModbusClientMixin from .const import ( CONF_DISPLAY_NAME, @@ -60,16 +60,36 @@ class BaseSensorAddress(ABC, Generic[_T]): force_single: bool = False @property - @abstractmethod def size(self) -> int: """Get number of registers this sensor's value occupies.""" + return self.datatype.value[1] + + @property + @abstractmethod + def datatype(self) -> ModbusClientMixin.DATATYPE: + """Get the pymodbus datatype for this sensor.""" + + def _decode_raw(self, registers: list[int]): + assert len(registers) == self.size + return ModbusClientMixin.convert_from_registers( + registers, + self.datatype, + word_order="little", + ) + + def _encode_raw(self, value: int | float) -> list[int]: + return ModbusClientMixin.convert_to_registers( + value, + self.datatype, + word_order="little", + ) @abstractmethod - def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _T]: + def decode(self, registers: list[int]) -> tuple[bool, _T]: """Decode this sensor's value.""" @abstractmethod - def encode(self, builder: BinaryPayloadBuilder, value: _T) -> None: + def encode(self, value: _T) -> list[int]: """Encode this sensor's value.""" @abstractmethod @@ -105,19 +125,19 @@ class IdmBinarySensorAddress(BaseSensorAddress[bool]): device_class: BinarySensorDeviceClass | None = None @property - def size(self) -> int: - """Number of registers this sensor's value occupies.""" - return 1 + def datatype(self) -> ModbusClientMixin.DATATYPE: + """Get the pymodbus datatype for this sensor.""" + return ModbusClientMixin.DATATYPE.UINT16 - def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, bool]: + def decode(self, registers: list[int]) -> tuple[bool, bool]: """Decode this sensor's value.""" - value = decoder.decode_16bit_uint() + value = self._decode_raw(registers) LOGGER.debug("raw value (uint16) for %s: %d", self.name, value) return (True, value > 0) - def encode(self, builder: BinaryPayloadBuilder, value: bool): + def encode(self, value: bool) -> list[int]: """Encode this sensor's value.""" - builder.add_16bit_uint(1 if value else 0) + return self._encode_raw(1 if value else 0) def entity_description( self, config_entry: ConfigEntry @@ -139,11 +159,12 @@ class _FloatSensorAddress(IdmSensorAddress[float]): max_value: float | None = None @property - def size(self): - return 2 + def datatype(self) -> ModbusClientMixin.DATATYPE: + """Get the pymodbus datatype for this sensor.""" + return ModbusClientMixin.DATATYPE.FLOAT32 - def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, float]: - raw_value = decoder.decode_32bit_float() + def decode(self, registers: list[int]) -> tuple[bool, float]: + raw_value = self._decode_raw(registers) LOGGER.debug("raw value (float32) for %s: %d", self.name, raw_value) value = round(raw_value * self.scale, self.decimal_digits) LOGGER.debug("scaled & rounded value for %s: %d", self.name, value) @@ -161,11 +182,11 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, float]: return (True, value) - def encode(self, builder: BinaryPayloadBuilder, value: float): + def encode(self, value: float) -> list[int]: assert (self.min_value is None or value >= self.min_value) and ( self.max_value is None or value <= self.max_value ) - builder.add_32bit_float(value) + return self._encode_raw(value) def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription: return SensorEntityDescription( @@ -184,11 +205,12 @@ class _UCharSensorAddress(IdmSensorAddress[int]): max_value: int | None = 0xFFFE @property - def size(self): - return 1 + def datatype(self) -> ModbusClientMixin.DATATYPE: + """Get the pymodbus datatype for this sensor.""" + return ModbusClientMixin.DATATYPE.UINT16 - def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]: - value = decoder.decode_16bit_uint() + def decode(self, registers: list[int]) -> tuple[bool, int]: + value = self._decode_raw(registers) LOGGER.debug("raw value (uint16) for %s: %d", self.name, value) if self.max_value == 0xFFFE and value == 0xFFFF: @@ -204,11 +226,11 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]: return (True, value) - def encode(self, builder: BinaryPayloadBuilder, value: int): + def encode(self, value: int) -> list[int]: assert (self.min_value is None or value >= self.min_value) and ( self.max_value is None or value <= self.max_value ) - builder.add_16bit_uint(value) + return self._encode_raw(value) def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription: return SensorEntityDescription( @@ -227,11 +249,12 @@ class _WordSensorAddress(IdmSensorAddress[int]): max_value: int | None = None @property - def size(self): - return 1 + def datatype(self) -> ModbusClientMixin.DATATYPE: + """Get the pymodbus datatype for this sensor.""" + return ModbusClientMixin.DATATYPE.INT16 - def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]: - value = decoder.decode_16bit_int() + def decode(self, registers: list[int]) -> tuple[bool, int]: + value = self._decode_raw(registers) if self.min_value == 0 and value == -1: # special case: unavailable @@ -247,11 +270,11 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]: return (True, value) - def encode(self, builder: BinaryPayloadBuilder, value: float): + def encode(self, value: int) -> list[int]: assert (self.min_value is None or value >= self.min_value) and ( self.max_value is None or value <= self.max_value ) - builder.add_16bit_uint(value) + return self._encode_raw(value) def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription: return SensorEntityDescription( @@ -268,11 +291,12 @@ class _EnumSensorAddress(IdmSensorAddress[_EnumT], Generic[_EnumT]): enum: type[_EnumT] @property - def size(self): - return 1 + def datatype(self) -> ModbusClientMixin.DATATYPE: + """Get the pymodbus datatype for this sensor.""" + return ModbusClientMixin.DATATYPE.UINT16 - def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _EnumT]: - value = decoder.decode_16bit_uint() + def decode(self, registers: list[int]) -> tuple[bool, _EnumT]: + value = self._decode_raw(registers) LOGGER.debug("raw value (uint16) for %s: %d", self.name, value) if value == 0xFFFF: # special case: unavailable @@ -283,8 +307,8 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _EnumT]: except ValueError as error: raise ValueError(f"decode failed for {value}") from error - def encode(self, builder: BinaryPayloadBuilder, value: _EnumT): - builder.add_16bit_uint(value.value) + def encode(self, value: _EnumT) -> list[int]: + return self._encode_raw(value.value) def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription: return SensorEntityDescription( @@ -300,11 +324,12 @@ class _BitFieldSensorAddress(IdmSensorAddress[_FlagT], Generic[_FlagT]): flag: type[_FlagT] @property - def size(self): - return 1 + def datatype(self) -> ModbusClientMixin.DATATYPE: + """Get the pymodbus datatype for this sensor.""" + return ModbusClientMixin.DATATYPE.UINT16 - def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _FlagT]: - value = decoder.decode_16bit_uint() + def decode(self, registers: list[int]) -> tuple[bool, _FlagT]: + value = self._decode_raw(registers) LOGGER.debug("raw value (uint16) for %s: %d", self.name, value) if value == 0xFFFF: # special case: unavailable @@ -315,8 +340,8 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _FlagT]: except ValueError as error: raise ValueError(f"decode failed for {value}") from error - def encode(self, builder: BinaryPayloadBuilder, value: _FlagT): - builder.add_16bit_uint(value) + def encode(self, value: _FlagT) -> list[int]: + return self._encode_raw(value) def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription: return SensorEntityDescription( diff --git a/requirements.txt b/requirements.txt index 42a2943..9beea2a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ colorlog==6.9.0 -homeassistant>=2024.1.5 +homeassistant>=2025.1.0 pip>=21.3.1 ruff==0.8.6 -r custom_components/idm_heatpump/requirements.txt