Skip to content

Commit

Permalink
chore: updates for 2025.1
Browse files Browse the repository at this point in the history
Fixes #176
  • Loading branch information
kodebach committed Jan 6, 2025
1 parent f28c1df commit 1ee6b9f
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 59 deletions.
2 changes: 1 addition & 1 deletion .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "IDM heat pump integration development",
"image": "mcr.microsoft.com/devcontainers/python:3.12-bullseye",
"image": "mcr.microsoft.com/devcontainers/python:3.13-bullseye",
"postCreateCommand": "scripts/setup",
"forwardPorts": [
8123
Expand Down
25 changes: 9 additions & 16 deletions custom_components/idm_heatpump/idm_heatpump.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
from pymodbus.client import AsyncModbusTcpClient
from pymodbus.constants import Endian
from pymodbus.exceptions import ConnectionException, ModbusException
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
from pymodbus.register_read_message import ReadInputRegistersResponse
from pymodbus.payload import BinaryPayloadBuilder
from pymodbus.pdu.register_message import ReadInputRegistersResponse

from .const import NAME_POWER_USAGE
from .logger import LOGGER
Expand Down Expand Up @@ -171,13 +171,7 @@ def decode_single(
result: ReadInputRegistersResponse,
):
try:
available, value = sensor.decode(
BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.LITTLE,
)
)
available, value = sensor.decode(result.registers)
if available:
data[sensor.name] = value
except ValueError as single_error:
Expand All @@ -190,21 +184,20 @@ def decode_single(
data[sensor.name] = None

try:
decoder = BinaryPayloadDecoder.fromRegisters(
result.registers,
byteorder=Endian.BIG,
wordorder=Endian.LITTLE,
)

LOGGER.debug("got decoder %d", group.start)

if len(group.sensors) == 1:
# single sensor -> don't do refetch on error
decode_single(group.sensors[0], result)
else:
register_ptr = 0
for sensor in group.sensors:
try:
available, value = sensor.decode(decoder)
registers = result.registers[
register_ptr : register_ptr + sensor.size
]
register_ptr += sensor.size
available, value = sensor.decode(registers)
if available:
data[sensor.name] = value
except ValueError as error:
Expand Down
107 changes: 66 additions & 41 deletions custom_components/idm_heatpump/sensor_addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
UnitOfPower,
UnitOfTemperature,
)
from pymodbus.payload import BinaryPayloadBuilder, BinaryPayloadDecoder
from pymodbus.client.mixin import ModbusClientMixin

from .const import (
CONF_DISPLAY_NAME,
Expand Down Expand Up @@ -60,16 +60,36 @@ class BaseSensorAddress(ABC, Generic[_T]):
force_single: bool = False

@property
@abstractmethod
def size(self) -> int:
"""Get number of registers this sensor's value occupies."""
return self.datatype.value[1]

@property
@abstractmethod
def datatype(self) -> ModbusClientMixin.DATATYPE:
"""Get the pymodbus datatype for this sensor."""

def _decode_raw(self, registers: list[int]):
assert len(registers) == self.size
return ModbusClientMixin.convert_from_registers(
registers,
self.datatype,
word_order="little",
)

def _encode_raw(self, value: int | float) -> list[int]:
return ModbusClientMixin.convert_to_registers(
value,
self.datatype,
word_order="little",
)

@abstractmethod
def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _T]:
def decode(self, registers: list[int]) -> tuple[bool, _T]:
"""Decode this sensor's value."""

@abstractmethod
def encode(self, builder: BinaryPayloadBuilder, value: _T) -> None:
def encode(self, value: _T) -> list[int]:
"""Encode this sensor's value."""

@abstractmethod
Expand Down Expand Up @@ -105,19 +125,19 @@ class IdmBinarySensorAddress(BaseSensorAddress[bool]):
device_class: BinarySensorDeviceClass | None = None

@property
def size(self) -> int:
"""Number of registers this sensor's value occupies."""
return 1
def datatype(self) -> ModbusClientMixin.DATATYPE:
"""Get the pymodbus datatype for this sensor."""
return ModbusClientMixin.DATATYPE.UINT16

def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, bool]:
def decode(self, registers: list[int]) -> tuple[bool, bool]:
"""Decode this sensor's value."""
value = decoder.decode_16bit_uint()
value = self._decode_raw(registers)
LOGGER.debug("raw value (uint16) for %s: %d", self.name, value)
return (True, value > 0)

def encode(self, builder: BinaryPayloadBuilder, value: bool):
def encode(self, value: bool) -> list[int]:
"""Encode this sensor's value."""
builder.add_16bit_uint(1 if value else 0)
return self._encode_raw(1 if value else 0)

def entity_description(
self, config_entry: ConfigEntry
Expand All @@ -139,11 +159,12 @@ class _FloatSensorAddress(IdmSensorAddress[float]):
max_value: float | None = None

@property
def size(self):
return 2
def datatype(self) -> ModbusClientMixin.DATATYPE:
"""Get the pymodbus datatype for this sensor."""
return ModbusClientMixin.DATATYPE.FLOAT32

def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, float]:
raw_value = decoder.decode_32bit_float()
def decode(self, registers: list[int]) -> tuple[bool, float]:
raw_value = self._decode_raw(registers)
LOGGER.debug("raw value (float32) for %s: %d", self.name, raw_value)
value = round(raw_value * self.scale, self.decimal_digits)
LOGGER.debug("scaled & rounded value for %s: %d", self.name, value)
Expand All @@ -161,11 +182,11 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, float]:

return (True, value)

def encode(self, builder: BinaryPayloadBuilder, value: float):
def encode(self, value: float) -> list[int]:
assert (self.min_value is None or value >= self.min_value) and (
self.max_value is None or value <= self.max_value
)
builder.add_32bit_float(value)
return self._encode_raw(value)

def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription:
return SensorEntityDescription(
Expand All @@ -184,11 +205,12 @@ class _UCharSensorAddress(IdmSensorAddress[int]):
max_value: int | None = 0xFFFE

@property
def size(self):
return 1
def datatype(self) -> ModbusClientMixin.DATATYPE:
"""Get the pymodbus datatype for this sensor."""
return ModbusClientMixin.DATATYPE.UINT16

def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]:
value = decoder.decode_16bit_uint()
def decode(self, registers: list[int]) -> tuple[bool, int]:
value = self._decode_raw(registers)
LOGGER.debug("raw value (uint16) for %s: %d", self.name, value)

if self.max_value == 0xFFFE and value == 0xFFFF:
Expand All @@ -204,11 +226,11 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]:

return (True, value)

def encode(self, builder: BinaryPayloadBuilder, value: int):
def encode(self, value: int) -> list[int]:
assert (self.min_value is None or value >= self.min_value) and (
self.max_value is None or value <= self.max_value
)
builder.add_16bit_uint(value)
return self._encode_raw(value)

def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription:
return SensorEntityDescription(
Expand All @@ -227,11 +249,12 @@ class _WordSensorAddress(IdmSensorAddress[int]):
max_value: int | None = None

@property
def size(self):
return 1
def datatype(self) -> ModbusClientMixin.DATATYPE:
"""Get the pymodbus datatype for this sensor."""
return ModbusClientMixin.DATATYPE.INT16

def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]:
value = decoder.decode_16bit_int()
def decode(self, registers: list[int]) -> tuple[bool, int]:
value = self._decode_raw(registers)

if self.min_value == 0 and value == -1:
# special case: unavailable
Expand All @@ -247,11 +270,11 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, int]:

return (True, value)

def encode(self, builder: BinaryPayloadBuilder, value: float):
def encode(self, value: int) -> list[int]:
assert (self.min_value is None or value >= self.min_value) and (
self.max_value is None or value <= self.max_value
)
builder.add_16bit_uint(value)
return self._encode_raw(value)

def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription:
return SensorEntityDescription(
Expand All @@ -268,11 +291,12 @@ class _EnumSensorAddress(IdmSensorAddress[_EnumT], Generic[_EnumT]):
enum: type[_EnumT]

@property
def size(self):
return 1
def datatype(self) -> ModbusClientMixin.DATATYPE:
"""Get the pymodbus datatype for this sensor."""
return ModbusClientMixin.DATATYPE.UINT16

def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _EnumT]:
value = decoder.decode_16bit_uint()
def decode(self, registers: list[int]) -> tuple[bool, _EnumT]:
value = self._decode_raw(registers)
LOGGER.debug("raw value (uint16) for %s: %d", self.name, value)
if value == 0xFFFF:
# special case: unavailable
Expand All @@ -283,8 +307,8 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _EnumT]:
except ValueError as error:
raise ValueError(f"decode failed for {value}") from error

def encode(self, builder: BinaryPayloadBuilder, value: _EnumT):
builder.add_16bit_uint(value.value)
def encode(self, value: _EnumT) -> list[int]:
return self._encode_raw(value.value)

def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription:
return SensorEntityDescription(
Expand All @@ -300,11 +324,12 @@ class _BitFieldSensorAddress(IdmSensorAddress[_FlagT], Generic[_FlagT]):
flag: type[_FlagT]

@property
def size(self):
return 1
def datatype(self) -> ModbusClientMixin.DATATYPE:
"""Get the pymodbus datatype for this sensor."""
return ModbusClientMixin.DATATYPE.UINT16

def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _FlagT]:
value = decoder.decode_16bit_uint()
def decode(self, registers: list[int]) -> tuple[bool, _FlagT]:
value = self._decode_raw(registers)
LOGGER.debug("raw value (uint16) for %s: %d", self.name, value)
if value == 0xFFFF:
# special case: unavailable
Expand All @@ -315,8 +340,8 @@ def decode(self, decoder: BinaryPayloadDecoder) -> tuple[bool, _FlagT]:
except ValueError as error:
raise ValueError(f"decode failed for {value}") from error

def encode(self, builder: BinaryPayloadBuilder, value: _FlagT):
builder.add_16bit_uint(value)
def encode(self, value: _FlagT) -> list[int]:
return self._encode_raw(value)

def entity_description(self, config_entry: ConfigEntry) -> SensorEntityDescription:
return SensorEntityDescription(
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
colorlog==6.9.0
homeassistant>=2024.1.5
homeassistant>=2025.1.0
pip>=21.3.1
ruff==0.8.6
-r custom_components/idm_heatpump/requirements.txt

0 comments on commit 1ee6b9f

Please sign in to comment.