diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index a73a15a..f4bbe41 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -29,10 +29,8 @@ repos:
# shell.
- id: mypy
name: mypy
- entry: mypy
+ entry: ./script/run-in-venv.sh mypy
language: python
types_or: [python, pyi]
- args:
- - --ignore-missing-imports
require_serial: true
files: ^(custom_components)/.+\.(py|pyi)$
diff --git a/README.md b/README.md
index e123853..7ed9e84 100644
--- a/README.md
+++ b/README.md
@@ -34,8 +34,8 @@ Cependant, certaines sondes peuvent avoir de la valeur dans leur "instantanéit
Suivant la configuration que vous choisirez pour votre installation vous trouverez dans ce fichier dans la liste des sondes avec les annotations suivantes:
-- 1 sonde compatible avec le mode temps réel: si celui-ci est activé par l'utilisateur, les mises à jours seront bien plus fréquentes (dès qu'elles sont lues sur la connection série)
-- 2 sonde dont le mode temps réel est forcé même si l'utilisateur n'a pas activé le mode temps réèl dans le cas où la valeur de la sonde est importante et/ou éphémère
+- ``1`` sonde compatible avec le mode temps réel: si celui-ci est activé par l'utilisateur, les mises à jours seront bien plus fréquentes (dès qu'elles sont lues sur la connection série)
+- ``2`` sonde dont le mode temps réel est forcé même si l'utilisateur n'a pas activé le mode temps réèl dans le cas où la valeur de la sonde est importante et/ou éphémère
### Mode historique
@@ -62,10 +62,10 @@ Les 23 champs des compteurs mono-phasé configurés en mode historique sont supp
- `PEJP` Préavis Début EJP (30 min)
- `PTEC` Période Tarifaire en cours
- `DEMAIN` Couleur du lendemain
-- `IINST` Intensité Instantanée 1
-- `ADPS` Avertissement de Dépassement De Puissance Souscrite 2
+- `IINST` Intensité Instantanée ``1``
+- `ADPS` Avertissement de Dépassement De Puissance Souscrite ``2``
- `IMAX` Intensité maximale appelée
-- `PAPP` Puissance apparente 1
+- `PAPP` Puissance apparente ``1``
- `HHPHC` Horaire Heures Pleines Heures Creuses
- `MOTDETAT` Mot d'état du compteur
@@ -92,23 +92,23 @@ Des retours de log en `DEBUG` pendant l'émission de trames courtes sont nécess
- `PEJP` Préavis Début EJP (30 min)
- `PTEC` Période Tarifaire en cours
- `DEMAIN` Couleur du lendemain
-- `IINST1` Intensité Instantanée (phase 1) 1 pour les trames longues 2 pour les trames courtes
-- `IINST2` Intensité Instantanée (phase 2) 1 pour les trames longues 2 pour les trames courtes
-- `IINST3` Intensité Instantanée (phase 3) 1 pour les trames longues 2 pour les trames courtes
+- `IINST1` Intensité Instantanée (phase 1) ``1`` pour les trames longues ``2`` pour les trames courtes
+- `IINST2` Intensité Instantanée (phase 2) ``1`` pour les trames longues ``2`` pour les trames courtes
+- `IINST3` Intensité Instantanée (phase 3) ``1`` pour les trames longues ``2`` pour les trames courtes
- `IMAX1` Intensité maximale (phase 1)
- `IMAX2` Intensité maximale (phase 2)
- `IMAX3` Intensité maximale (phase 3)
- `PMAX` Puissance maximale triphasée atteinte
-- `PAPP` Puissance apparente 1
+- `PAPP` Puissance apparente ``1``
- `HHPHC` Horaire Heures Pleines Heures Creuses
- `MOTDETAT` Mot d'état du compteur
-- `ADIR1` Avertissement de Dépassement d'intensité de réglage (phase 1) 2 trames courtes uniquement
-- `ADIR2` Avertissement de Dépassement d'intensité de réglage (phase 2) 2 trames courtes uniquement
-- `ADIR3` Avertissement de Dépassement d'intensité de réglage (phase 3) 2 trames courtes uniquement
+- `ADIR1` Avertissement de Dépassement d'intensité de réglage (phase 1) ``2`` trames courtes uniquement
+- `ADIR2` Avertissement de Dépassement d'intensité de réglage (phase 2) ``2`` trames courtes uniquement
+- `ADIR3` Avertissement de Dépassement d'intensité de réglage (phase 3) ``2`` trames courtes uniquement
### Mode standard
-Une beta est actuellement en cours pour la future v3 supportant le mode standard, vous la trouverez dans les [releases](https://github.com/hekmon/linkytic/releases). N'hésitez pas à faire vos retours dans [#19](https://github.com/hekmon/linkytic/pull/19) afin d'accélére la sortie de beta du mode standard !
+Une beta est actuellement en cours pour la future v3 supportant le mode standard, vous la trouverez dans les [releases](https://github.com/hekmon/linkytic/releases). N'hésitez pas à faire vos retours dans [#19](https://github.com/hekmon/linkytic/pull/19) afin d'accélére la sortie de beta du mode standard ! Si vous rencontrez un bug, vous pouvez aussi ouvrir une [issue](https://github.com/hekmon/linkytic/issues).
## Installation
diff --git a/custom_components/linkytic/__init__.py b/custom_components/linkytic/__init__.py
index d6d7abc..29c4a68 100644
--- a/custom_components/linkytic/__init__.py
+++ b/custom_components/linkytic/__init__.py
@@ -9,7 +9,9 @@
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EVENT_HOMEASSISTANT_STOP, Platform
from homeassistant.core import HomeAssistant
-from homeassistant.exceptions import ConfigEntryNotReady
+from homeassistant.exceptions import ConfigEntryError, ConfigEntryNotReady
+from homeassistant.helpers import entity_registry as er
+from homeassistant.util import slugify
from .const import (
DOMAIN,
@@ -28,22 +30,24 @@
_LOGGER = logging.getLogger(__name__)
-async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
+async def async_setup_entry(
+ hass: HomeAssistant, entry: ConfigEntry[LinkyTICReader]
+) -> bool:
"""Set up linkytic from a config entry."""
# Create the serial reader thread and start it
- port = entry.data.get(SETUP_SERIAL)
+ port = entry.data[SETUP_SERIAL]
try:
serial_reader = LinkyTICReader(
title=entry.title,
port=port,
- std_mode=entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD,
- producer_mode=entry.data.get(SETUP_PRODUCER),
- three_phase=entry.data.get(SETUP_THREEPHASE),
+ std_mode=entry.data[SETUP_TICMODE] == TICMODE_STANDARD,
+ producer_mode=entry.data[SETUP_PRODUCER],
+ three_phase=entry.data[SETUP_THREEPHASE],
real_time=entry.options.get(OPTIONS_REALTIME),
)
serial_reader.start()
- async def read_serial_number(serial: LinkyTICReader):
+ async def read_serial_number(serial: LinkyTICReader) -> str:
while serial.serial_number is None:
await asyncio.sleep(1)
# Check for any serial error that occurred in the serial thread context
@@ -65,56 +69,52 @@ async def read_serial_number(serial: LinkyTICReader):
"Connected to serial port but coulnd't read serial number before timeout: check if TIC is connected and active."
) from e
+ # entry.unique_id is the serial number read during the config flow, all data correspond to this meter s/n
+ if s_n != entry.unique_id:
+ serial_reader.signalstop("serial_number_mismatch")
+ raise ConfigEntryError(
+ f"Connected to a different meter with S/N: `{s_n}`, expected `{entry.unique_id}`. "
+ "Aborting setup to prevent overwriting long term data."
+ )
+
_LOGGER.info(f"Device connected with serial number: {s_n}")
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, serial_reader.signalstop)
# Add options callback
entry.async_on_unload(entry.add_update_listener(update_listener))
- entry.async_on_unload(lambda: serial_reader.signalstop("config_entry_unload"))
- # Add the serial reader to HA and initialize sensors
- try:
- hass.data[DOMAIN][entry.entry_id] = serial_reader
- except KeyError:
- hass.data[DOMAIN] = {}
- hass.data[DOMAIN][entry.entry_id] = serial_reader
+
+ entry.runtime_data = serial_reader
+
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
-async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
+async def async_unload_entry(
+ hass: HomeAssistant, entry: ConfigEntry[LinkyTICReader]
+) -> bool:
"""Unload a config entry."""
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
- # Remove the related entry
- hass.data[DOMAIN].pop(entry.entry_id)
+ reader = entry.runtime_data
+ reader.signalstop("unload")
return unload_ok
-async def update_listener(hass: HomeAssistant, entry: ConfigEntry):
+async def update_listener(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Handle options update."""
- # Retrieved the serial reader for this config entry
- try:
- serial_reader = hass.data[DOMAIN][entry.entry_id]
- except KeyError:
- _LOGGER.error(
- "Can not update options for %s: failed to get the serial reader object",
- entry.title,
- )
- return
- # Update its options
- serial_reader.update_options(entry.options.get(OPTIONS_REALTIME))
+
+ reader = entry.runtime_data
+ reader.update_options(entry.options.get(OPTIONS_REALTIME))
-async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry):
+async def async_migrate_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Migrate old entry."""
- _LOGGER.info(
- "Migrating from version %d.%d", config_entry.version, config_entry.minor_version
- )
+ _LOGGER.info("Migrating from version %d.%d", entry.version, entry.minor_version)
- if config_entry.version == 1:
- new = {**config_entry.data}
+ if entry.version == 1:
+ new = {**entry.data}
- if config_entry.minor_version < 2:
+ if entry.minor_version < 2:
# Migrate to serial by-id.
serial_by_id = await hass.async_add_executor_job(
usb.get_serial_by_id, new[SETUP_SERIAL]
@@ -127,14 +127,119 @@ async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry):
else:
new[SETUP_SERIAL] = serial_by_id
- # config_entry.minor_version = 2
+ # Migrate the unique ID to use the serial number, this is not backward compatible
+ try:
+ reader = LinkyTICReader(
+ title=entry.title,
+ port=new[SETUP_SERIAL],
+ std_mode=new[SETUP_TICMODE] == TICMODE_STANDARD,
+ producer_mode=new[SETUP_PRODUCER],
+ three_phase=new[SETUP_THREEPHASE],
+ )
+ reader.start()
+
+ async def read_serial_number(serial: LinkyTICReader) -> str:
+ while serial.serial_number is None:
+ await asyncio.sleep(1)
+ # Check for any serial error that occurred in the serial thread context
+ if serial.setup_error:
+ raise serial.setup_error
+ return serial.serial_number
+
+ s_n = await asyncio.wait_for(read_serial_number(reader), timeout=5)
+
+ except (*LINKY_IO_ERRORS, TimeoutError) as e:
+ _LOGGER.error(
+ "Error migrating config entry to version 2, could not read device serial number: (%s)",
+ e,
+ )
+ _LOGGER.warning("Restart Home Assistant to retry migration")
+ return False
+
+ finally:
+ reader.signalstop("probe_end")
+
+ serial_number = slugify(s_n)
+
+ # Explicitly pass serial number as config entry is not updated yet
+ await _migrate_entities_unique_id(hass, entry, serial_number)
+
hass.config_entries.async_update_entry(
- config_entry, data=new, minor_version=2, version=1
- ) # type: ignore
+ entry, data=new, version=2, minor_version=0, unique_id=serial_number
+ )
_LOGGER.info(
"Migration to version %d.%d successful",
- config_entry.version,
- config_entry.minor_version,
+ entry.version,
+ entry.minor_version,
)
return True
+
+
+async def _migrate_entities_unique_id(
+ hass: HomeAssistant, entry: ConfigEntry, serial_number: str
+) -> None:
+ """Migrate entities unique id to conform to HA specifications."""
+
+ # Old entries are of format f"{DOMAIN}_{entry.config_id}_suffix"
+ # which is not conform to HA unique ID requirements (https://developers.home-assistant.io/docs/entity_registry_index#unique-id-requirements)
+ # domain should not appear in the unique id
+ # ConfigEntry.config_id is a last resort unique id when no acceptable source is awailable
+ # the meter serial number is a valid (and better) device unique id
+
+ # Since we are migrating unique id, might as well migrate some suffixes for consistency
+ _ENTITY_MIGRATION_SUFFIX = {
+ # non-standard tic tags that were implemented as different sensors
+ "smaxn": "smaxsn",
+ "smaxn-1": "smaxsn-1",
+ # status register sensors, due to field renaming
+ "contact_sec": "dry_contact",
+ "organe_de_coupure": "trip_unit",
+ "etat_du_cache_borne_distributeur": "terminal_cover",
+ "surtension_sur_une_des_phases": "overvoltage",
+ "depassement_puissance_reference": "power_over_ref",
+ "producteur_consommateur": "producer",
+ "sens_energie_active": "injecting",
+ "tarif_contrat_fourniture": "provider_index",
+ "tarif_contrat_distributeur": "distributor_index",
+ "mode_degrade_horloge": "rtc_degraded",
+ "mode_tic": "tic_std",
+ "etat_sortie_communication_euridis": "euridis",
+ "synchro_cpl": "cpl_sync",
+ "status_cpl": "cpl_status",
+ "couleur_jour_contrat_tempo": "color_today",
+ "couleur_lendemain_contrat_tempo": "color_next_day",
+ "preavis_pointes_mobiles": "mobile_peak_notice",
+ "pointe_mobile": "mobile_peak",
+ }
+
+ entity_reg = er.async_get(hass)
+ entities = er.async_entries_for_config_entry(entity_reg, entry.entry_id)
+
+ migrate_entities: dict[str, str] = {}
+
+ for entity in entities:
+ old_unique_id = entity.unique_id
+
+ # Old ids all start with `linkytic_`
+ if not old_unique_id.startswith(DOMAIN):
+ continue
+
+ # format `linkytic_ENTRYID_suffix
+ old_suffix = old_unique_id.split(entry.entry_id + "_", maxsplit=1)[1]
+
+ if (new_suffix := _ENTITY_MIGRATION_SUFFIX.get(old_suffix)) is None:
+ # entity is not in the migration table, just remove the domain prefix and update the entry_id
+ new_suffix = old_suffix
+
+ migrate_entities[entity.entity_id] = slugify(f"{serial_number}_{new_suffix}")
+
+ _LOGGER.debug(
+ "Updating entity %s from unique id `%s` to `%s`",
+ entity.entity_id,
+ old_unique_id,
+ migrate_entities[entity.entity_id],
+ )
+
+ for entity_id, unique_id in migrate_entities.items():
+ entity_reg.async_update_entity(entity_id, new_unique_id=unique_id)
diff --git a/custom_components/linkytic/binary_sensor.py b/custom_components/linkytic/binary_sensor.py
index b6aa86d..630f910 100644
--- a/custom_components/linkytic/binary_sensor.py
+++ b/custom_components/linkytic/binary_sensor.py
@@ -3,129 +3,114 @@
from __future__ import annotations
import logging
-from typing import Optional, cast
+from dataclasses import dataclass
+from typing import cast
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntity,
+ BinarySensorEntityDescription,
)
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import EntityCategory
-from homeassistant.core import HomeAssistant
+from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
+from homeassistant.util import slugify
-from .const import DOMAIN, SETUP_TICMODE, TICMODE_STANDARD
+from .const import SETUP_TICMODE, TICMODE_STANDARD
from .entity import LinkyTICEntity
from .serial_reader import LinkyTICReader
from .status_register import StatusRegister
_LOGGER = logging.getLogger(__name__)
-STATUS_REGISTER_SENSORS = (
- (
- StatusRegister.CONTACT_SEC,
- "Contact sec",
- BinarySensorDeviceClass.OPENING,
- "mdi:electric-switch-closed",
- "mdi:electric-switch",
- False,
+
+@dataclass(frozen=True, kw_only=True)
+class StatusRegisterBinarySensorDescription(BinarySensorEntityDescription):
+ """Binary sensor entity description for status register fields."""
+
+ key: str = "STGE"
+ entity_category: EntityCategory | None = EntityCategory.DIAGNOSTIC
+ field: StatusRegister
+ inverted: bool = False
+
+
+STATUS_REGISTER_SENSORS: tuple[StatusRegisterBinarySensorDescription, ...] = (
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_dry_contact",
+ field=StatusRegister.DRY_CONTACT,
+ device_class=BinarySensorDeviceClass.OPENING,
+ ),
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_terminal_cover",
+ field=StatusRegister.TERMINAL_COVER,
+ device_class=BinarySensorDeviceClass.OPENING,
),
- (
- StatusRegister.ETAT_DU_CACHE_BORNE_DISTRIBUTEUR,
- "Cache-borne",
- BinarySensorDeviceClass.OPENING,
- "mdi:toy-brick",
- "mdi:toy-brick-outline",
- False,
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_overvoltage",
+ field=StatusRegister.OVERVOLTAGE,
+ device_class=BinarySensorDeviceClass.PROBLEM,
),
- (
- StatusRegister.SURTENSION_SUR_UNE_DES_PHASES,
- "Surtension",
- BinarySensorDeviceClass.PRESENCE,
- "mdi:flash-triangle-outline",
- "mdi:flash-triangle",
- False,
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_power_over_ref",
+ field=StatusRegister.POWER_OVER_REF,
+ device_class=BinarySensorDeviceClass.PROBLEM,
),
- (
- StatusRegister.DEPASSEMENT_PUISSANCE_REFERENCE,
- "Dépassement puissance",
- BinarySensorDeviceClass.PRESENCE,
- "mdi:transmission-tower",
- "mdi:transmission-tower-off",
- False,
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_producer",
+ field=StatusRegister.PRODUCER,
),
- (
- StatusRegister.PRODUCTEUR_CONSOMMATEUR,
- "Producteur",
- None,
- "mdi:transmission-tower-export",
- None,
- False,
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_injection",
+ field=StatusRegister.INJECTING,
),
- (
- StatusRegister.SENS_ENERGIE_ACTIVE,
- "Sens énergie active",
- None,
- "mdi:transmission-tower-export",
- None,
- False,
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_rtc_sync",
+ field=StatusRegister.RTC_DEGRADED,
+ device_class=BinarySensorDeviceClass.LOCK,
),
- (
- StatusRegister.MODE_DEGRADE_HORLOGE,
- "Synchronisation horloge",
- BinarySensorDeviceClass.LOCK,
- "mdi:sync",
- "mdi:sync-off",
- False,
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_mode_tic",
+ field=StatusRegister.TIC_STD,
),
- (StatusRegister.MODE_TIC, "Mode historique", None, "mdi:tag", None, False),
- (
- StatusRegister.SYNCHRO_CPL,
- "Synchronisation CPL",
- BinarySensorDeviceClass.LOCK,
- "mdi:sync",
- "mdi:sync-off",
- True,
+ StatusRegisterBinarySensorDescription(
+ translation_key="status_cpl_sync",
+ field=StatusRegister.CPL_SYNC,
+ device_class=BinarySensorDeviceClass.LOCK,
+ inverted=True,
),
)
+SERIAL_LINK_BINARY_SENSOR = BinarySensorEntityDescription(
+ key="serial_link",
+ translation_key="serial_link",
+ entity_category=EntityCategory.DIAGNOSTIC,
+ device_class=BinarySensorDeviceClass.CONNECTIVITY,
+)
+
# config flow setup
async def async_setup_entry(
hass: HomeAssistant,
- config_entry: ConfigEntry,
+ config_entry: ConfigEntry[LinkyTICReader],
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up entry."""
_LOGGER.debug("%s: setting up binary sensor plateform", config_entry.title)
# Retrieve the serial reader object
- try:
- serial_reader = hass.data[DOMAIN][config_entry.entry_id]
- except KeyError:
- _LOGGER.error(
- "%s: can not init binaries sensors: failed to get the serial reader object",
- config_entry.title,
- )
- return
+ reader = config_entry.runtime_data
+
# Init sensors
sensors: list[BinarySensorEntity] = [
- SerialConnectivity(config_entry.title, config_entry.entry_id, serial_reader)
+ SerialConnectivity(SERIAL_LINK_BINARY_SENSOR, config_entry, reader)
]
if config_entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD:
sensors.extend(
StatusRegisterBinarySensor(
- name=name,
- config_title=config_entry.title,
- field=field,
- serial_reader=serial_reader,
- unique_id=config_entry.entry_id,
- device_class=devclass,
- icon_off=icon_off,
- icon_on=icon_on,
- inverted=inverted,
+ description=description, config_entry=config_entry, reader=reader
)
- for field, name, devclass, icon_off, icon_on, inverted in STATUS_REGISTER_SENSORS
+ for description in STATUS_REGISTER_SENSORS
)
async_add_entities(sensors, True)
@@ -134,23 +119,16 @@ async def async_setup_entry(
class SerialConnectivity(LinkyTICEntity, BinarySensorEntity):
"""Serial connectivity to the Linky TIC serial interface."""
- # Generic properties
- # https://developers.home-assistant.io/docs/core/entity#generic-properties
- _attr_entity_category = EntityCategory.DIAGNOSTIC
- _attr_name = "Connectivité du lien série"
-
- # Binary sensor properties
- # https://developers.home-assistant.io/docs/core/entity/binary-sensor/#properties
- _attr_device_class = BinarySensorDeviceClass.CONNECTIVITY
-
def __init__(
- self, title: str, unique_id: str, serial_reader: LinkyTICReader
+ self,
+ description: BinarySensorEntityDescription,
+ config_entry: ConfigEntry,
+ reader: LinkyTICReader,
) -> None:
"""Initialize the SerialConnectivity binary sensor."""
- _LOGGER.debug("%s: initializing Serial Connectivity binary sensor", title)
- super().__init__(serial_reader)
- self._title = title
- self._attr_unique_id = f"{DOMAIN}_{unique_id}_serial_connectivity"
+ super().__init__(reader)
+ self.entity_description = description
+ self._attr_unique_id = slugify(f"{reader.serial_number}_serial_connectivity")
@property
def is_on(self) -> bool:
@@ -161,55 +139,37 @@ def is_on(self) -> bool:
class StatusRegisterBinarySensor(LinkyTICEntity, BinarySensorEntity):
"""Binary sensor for binary status register fields."""
- _attr_entity_category = EntityCategory.DIAGNOSTIC
-
_binary_state: bool
_tag = "STGE"
def __init__(
self,
- name: str,
- config_title: str,
- unique_id: str,
- serial_reader: LinkyTICReader,
- field: StatusRegister,
- device_class: BinarySensorDeviceClass | None = None,
- icon_on: str | None = None,
- icon_off: str | None = None,
- inverted: bool = False,
+ description: StatusRegisterBinarySensorDescription,
+ config_entry: ConfigEntry,
+ reader: LinkyTICReader,
) -> None:
"""Initialize the status register binary sensor."""
- _LOGGER.debug("%s: initializing %s binary sensor", config_title, field.name)
- super().__init__(serial_reader)
+ _LOGGER.debug(
+ "%s: initializing %s binary sensor",
+ config_entry.title,
+ description.field.name,
+ )
+ super().__init__(reader)
- self._config_title = config_title
+ self.entity_description = description
self._binary_state = False # Default state.
- self._inverted = inverted
- self._field = field
- self._attr_name = name
- self._attr_unique_id = f"{DOMAIN}_{unique_id}_{field.name.lower()}"
- if device_class:
- self._attr_device_class = device_class
-
- self._icon_on = icon_on
- self._icon_off = icon_off
+ self._inverted = description.inverted
+ self._field = description.field
+ self._attr_unique_id = slugify(
+ f"{reader.serial_number}_{description.field.name}"
+ )
@property
def is_on(self) -> bool:
"""Value of the sensor."""
return self._binary_state ^ self._inverted
- @property
- def icon(self) -> str | None:
- """Return icon of the sensor."""
- if not self._icon_off or not self._icon_on:
- return self._icon_on or self._icon_off or super().icon
-
- if self.is_on:
- return self._icon_on
- else:
- return self._icon_off
-
+ @callback
def update(self) -> None:
"""Update the state of the sensor."""
value, _ = self._update()
@@ -218,12 +178,12 @@ def update(self) -> None:
self._binary_state = cast(bool, self._field.value.get_status(value))
# TODO: factor _update function to remove copy from sensors entities
- def _update(self) -> tuple[Optional[str], Optional[str]]:
+ def _update(self) -> tuple[str | None, str | None]:
"""Get value and/or timestamp from cached data. Responsible for updating sensor availability."""
value, timestamp = self._serial_controller.get_values(self._tag)
_LOGGER.debug(
"%s: retrieved %s value from serial controller: (%s, %s)",
- self._config_title,
+ self._serial_controller.name,
self._tag,
value,
timestamp,
@@ -236,14 +196,14 @@ def _update(self) -> tuple[Optional[str], Optional[str]]:
if not self._serial_controller.is_connected:
_LOGGER.debug(
"%s: marking the %s sensor as unavailable: serial connection lost",
- self._config_title,
+ self._serial_controller.name,
self._tag,
)
self._attr_available = False
elif self._serial_controller.has_read_full_frame:
_LOGGER.info(
"%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found",
- self._config_title,
+ self._serial_controller.name,
self._tag,
self._tag,
)
@@ -259,7 +219,7 @@ def _update(self) -> tuple[Optional[str], Optional[str]]:
self._attr_available = True
_LOGGER.info(
"%s: marking the %s sensor as available now !",
- self._config_title,
+ self._serial_controller.name,
self._tag,
)
diff --git a/custom_components/linkytic/config_flow.py b/custom_components/linkytic/config_flow.py
index 80f90d9..b9f0887 100644
--- a/custom_components/linkytic/config_flow.py
+++ b/custom_components/linkytic/config_flow.py
@@ -22,6 +22,7 @@
from .const import (
DOMAIN,
+ LINKY_IO_ERRORS,
OPTIONS_REALTIME,
SETUP_PRODUCER,
SETUP_PRODUCER_DEFAULT,
@@ -35,14 +36,14 @@
TICMODE_STANDARD,
TICMODE_STANDARD_LABEL,
)
-from .serial_reader import CannotConnect, CannotRead, linky_tic_tester
+from .serial_reader import LinkyTICReader
_LOGGER = logging.getLogger(__name__)
STEP_USER_DATA_SCHEMA = vol.Schema(
{
- vol.Required(SETUP_SERIAL, default=SETUP_SERIAL_DEFAULT): str, # type: ignore
- vol.Required(SETUP_TICMODE, default=TICMODE_HISTORIC): selector.SelectSelector( # type: ignore
+ vol.Required(SETUP_SERIAL, default=SETUP_SERIAL_DEFAULT): str,
+ vol.Required(SETUP_TICMODE, default=TICMODE_HISTORIC): selector.SelectSelector(
selector.SelectSelectorConfig(
options=[
selector.SelectOptionDict(
@@ -54,17 +55,17 @@
]
),
),
- vol.Required(SETUP_PRODUCER, default=SETUP_PRODUCER_DEFAULT): bool, # type: ignore
- vol.Required(SETUP_THREEPHASE, default=SETUP_THREEPHASE_DEFAULT): bool, # type: ignore
+ vol.Required(SETUP_PRODUCER, default=SETUP_PRODUCER_DEFAULT): bool,
+ vol.Required(SETUP_THREEPHASE, default=SETUP_THREEPHASE_DEFAULT): bool,
}
)
-class LinkyTICConfigFlow(ConfigFlow, domain=DOMAIN): # type:ignore
+class LinkyTICConfigFlow(ConfigFlow, domain=DOMAIN):
"""Handle a config flow for linkytic."""
- VERSION = 1
- MINOR_VERSION = 2
+ VERSION = 2
+ MINOR_VERSION = 0
async def async_step_user(
self, user_input: dict[str, Any] | None = None
@@ -75,38 +76,59 @@ async def async_step_user(
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA
)
- # Validate input
- await self.async_set_unique_id(DOMAIN + "_" + user_input[SETUP_SERIAL])
- self._abort_if_unique_id_configured()
# Search for serial/by-id, which SHOULD be a persistent name to serial interface.
_port = await self.hass.async_add_executor_job(
usb.get_serial_by_id, user_input[SETUP_SERIAL]
)
+ user_input[SETUP_SERIAL] = _port
+
errors = {}
- title = user_input[SETUP_SERIAL]
+
try:
- # Encapsulate the tester function, pyserial rfc2217 implementation have blocking calls.
- await asyncio.to_thread(
- linky_tic_tester,
- device=_port,
- std_mode=user_input[SETUP_TICMODE] == TICMODE_STANDARD,
+ serial_reader = LinkyTICReader(
+ title="Probe",
+ port=_port,
+ std_mode=user_input.get(SETUP_TICMODE) == TICMODE_STANDARD,
+ producer_mode=user_input[SETUP_PRODUCER],
+ three_phase=user_input[SETUP_THREEPHASE],
+ real_time=False,
)
- except CannotConnect as cannot_connect:
- _LOGGER.error("%s: can not connect: %s", title, cannot_connect)
+ serial_reader.start()
+
+ async def read_serial_number(serial: LinkyTICReader) -> str:
+ while serial.serial_number is None:
+ await asyncio.sleep(1)
+ # Check for any serial error that occurred in the serial thread context
+ if serial.setup_error:
+ raise serial.setup_error
+ return serial.serial_number
+
+ s_n = await asyncio.wait_for(read_serial_number(serial_reader), timeout=5)
+
+ # Error when opening serial port.
+ except LINKY_IO_ERRORS as cannot_connect:
+ _LOGGER.error("Could not connect to %s (%s)", _port, cannot_connect)
errors["base"] = "cannot_connect"
- except CannotRead as cannot_read:
- _LOGGER.error(
- "%s: can not read a line after connection: %s", title, cannot_read
- )
+
+ # Timeout waiting for S/N to be read.
+ except TimeoutError:
+ _LOGGER.error("Could not read serial number at %s", _port)
errors["base"] = "cannot_read"
- except Exception as exc: # pylint: disable=broad-except
- _LOGGER.exception("Unexpected exception: %s", exc)
- errors["base"] = "unknown"
+
else:
- user_input[SETUP_SERIAL] = _port
- return self.async_create_entry(title=title, data=user_input)
+ _LOGGER.info("Found a device with serial number: %s", s_n)
+
+ await self.async_set_unique_id(s_n)
+ self._abort_if_unique_id_configured()
+
+ return self.async_create_entry(
+ title=user_input[SETUP_SERIAL], data=user_input
+ )
+
+ finally:
+ serial_reader.signalstop("end_probe")
return self.async_show_form(
step_id="user", data_schema=STEP_USER_DATA_SCHEMA, errors=errors
@@ -145,7 +167,7 @@ async def async_step_init(
{
vol.Required(
OPTIONS_REALTIME,
- default=self.config_entry.options.get(OPTIONS_REALTIME), # type: ignore
+ default=self.config_entry.options.get(OPTIONS_REALTIME),
): bool
}
),
diff --git a/custom_components/linkytic/entity.py b/custom_components/linkytic/entity.py
index 6f4f3f3..d8307a6 100644
--- a/custom_components/linkytic/entity.py
+++ b/custom_components/linkytic/entity.py
@@ -40,4 +40,7 @@ def device_info(self) -> DeviceInfo:
manufacturer=did.get(DID_CONSTRUCTOR, DID_DEFAULT_MANUFACTURER),
model=did.get(DID_TYPE, DID_DEFAULT_MODEL),
name=DID_DEFAULT_NAME,
+ serial_number=self._serial_controller.serial_number,
+ sw_version="TIC "
+ + ("Standard" if self._serial_controller._std_mode else "Historique"),
)
diff --git a/custom_components/linkytic/icons.json b/custom_components/linkytic/icons.json
new file mode 100644
index 0000000..e614670
--- /dev/null
+++ b/custom_components/linkytic/icons.json
@@ -0,0 +1,179 @@
+{
+ "entity": {
+ "sensor": {
+ "serial_number": {
+ "default": "mdi:meter-electric-outline"
+ },
+ "tarif_option": {
+ "default": "mdi:cash-check"
+ },
+ "subcription_current": {
+ "default": "mdi:cash-check"
+ },
+ "current_tarif": {
+ "default": "mdi:calendar-expand-horizontal"
+ },
+ "peak_notice": {
+ "default": "mdi:clock-outline"
+ },
+ "tomorrow_color": {
+ "default": "mdi:palette"
+ },
+ "peak_hour_schedule": {
+ "default": "mdi:clock-outline"
+ },
+ "meter_state": {
+ "default": "mdi:file-word-box-outline"
+ },
+ "overcurrent_warning": {
+ "default": "mdi:flash-triangle-outline"
+ },
+ "overcurrent_warning_ph1": {
+ "default": "mdi:flash-triangle-outline"
+ },
+ "overcurrent_warning_ph2": {
+ "default": "mdi:flash-triangle-outline"
+ },
+ "overcurrent_warning_ph3": {
+ "default": "mdi:flash-triangle-outline"
+ },
+ "tic_version": {
+ "default": "mdi:tag"
+ },
+ "datetime": {
+ "default": "mdi:clock"
+ },
+ "tarif_name": {
+ "default": "mdi:cash-check"
+ },
+ "current_tarif_label": {
+ "default": "mdi:cash-check"
+ },
+ "mobile_peak_start_1": {
+ "default": "mdi:clock-start"
+ },
+ "mobile_peak_end_1": {
+ "default": "mdi:clock-end"
+ },
+ "mobile_peak_start_2": {
+ "default": "mdi:clock-start"
+ },
+ "mobile_peak_end_2": {
+ "default": "mdi:clock-end"
+ },
+ "mobile_peak_start_3": {
+ "default": "mdi:clock-start"
+ },
+ "mobile_peak_end_3": {
+ "default": "mdi:clock-end"
+ },
+ "short_msg": {
+ "default": "mdi:message-text-outline"
+ },
+ "ultra_short_msg": {
+ "default": "mdi:message-text-outline"
+ },
+ "delivery_point": {
+ "default": "mdi:tag"
+ },
+ "relays": {
+ "default": "mdi:electric-switch"
+ },
+ "current_tarif_index": {
+ "default": "mdi:cash-check"
+ },
+ "calendar_index_today": {
+ "default": "mdi:calendar-month-outline"
+ },
+ "calendar_index_tomorrow": {
+ "default": "mdi:calendar-month-outline"
+ },
+ "calendar_profile_tomorrow": {
+ "default": "mdi:calendar-month-outline"
+ },
+ "next_peak_dat_profile": {
+ "default": "mdi:calendar-month-outline"
+ },
+ "status_register": {
+ "default": "mdi:list-status"
+ },
+ "status_trip_device": {
+ "default": "mdi:connection"
+ },
+ "status_tarif_provider": {
+ "default": "mdi:cash-check"
+ },
+ "status_tarif_distributor": {
+ "default": "mdi:cash-check"
+ },
+ "status_euridis": {
+ "default": "mdi:tag"
+ },
+ "status_cpl": {
+ "default": "mdi:tag"
+ },
+ "status_tempo_color_today": {
+ "default": "mdi:palette"
+ },
+ "status_tempo_color_tomorrow": {
+ "default": "mdi:palette"
+ },
+ "status_mobile_peak_notice": {
+ "default": "mdi:clock-alert-outline"
+ },
+ "status_mobile_peak": {
+ "default": "mdi:progress-clock"
+ }
+ },
+ "binary_sensor": {
+ "status_dry_contact": {
+ "state": {
+ "off": "mdi:electric-switch-closed",
+ "on": "mdi:electric-switch"
+ }
+ },
+ "status_terminal_cover": {
+ "state": {
+ "off": "mdi:toy-brick",
+ "on": "mdi:toy-brick-outline"
+ }
+ },
+ "status_overvoltage": {
+ "state": {
+ "on": "mdi:flash-triangle",
+ "off": "mdi:flash-triangle-outline"
+ }
+ },
+ "status_power_over_ref": {
+ "state": {
+ "on": "mdi:flash-alert",
+ "off": "mdi:flash"
+ }
+ },
+ "status_producer": {
+ "default": "mdi:transmission-tower-import"
+ },
+ "status_injection": {
+ "state": {
+ "on": "mdi:transmission-tower-import",
+ "off": "mdi:transmission-tower-export"
+ }
+ },
+ "status_rtc_sync": {
+ "state": {
+ "on": "mdi:sync-off",
+ "off": "mdi:sync"
+ }
+ },
+ "status_mode_tic": {
+ "default": "mdi:information-slab-box-outline"
+ },
+ "status_cpl_sync": {
+ "state": {
+ "on": "mdi:sync-off",
+ "off": "mdi:sync"
+ }
+ }
+ }
+ }
+}
diff --git a/custom_components/linkytic/sensor.py b/custom_components/linkytic/sensor.py
index e25f509..b3eb919 100644
--- a/custom_components/linkytic/sensor.py
+++ b/custom_components/linkytic/sensor.py
@@ -2,11 +2,13 @@
from __future__ import annotations
+import contextlib
import logging
-from collections.abc import Callable
-from typing import Generic, Iterable, Optional, TypeVar, cast
+from collections.abc import Callable, Generator
+from dataclasses import dataclass
+from typing import Generic, TypeVar, cast
-from homeassistant.components.sensor import SensorEntity
+from homeassistant.components.sensor import SensorEntity, SensorEntityDescription
from homeassistant.components.sensor.const import SensorDeviceClass, SensorStateClass
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
@@ -18,8 +20,8 @@
UnitOfPower,
)
from homeassistant.core import HomeAssistant, callback
-from homeassistant.helpers.entity import Entity
from homeassistant.helpers.entity_platform import AddEntitiesCallback
+from homeassistant.util import slugify
from .const import (
DID_CONSTRUCTOR,
@@ -28,8 +30,6 @@
DID_TYPE,
DID_TYPE_CODE,
DID_YEAR,
- DOMAIN,
- EXPERIMENTAL_DEVICES,
SETUP_PRODUCER,
SETUP_THREEPHASE,
SETUP_TICMODE,
@@ -41,1028 +41,583 @@
_LOGGER = logging.getLogger(__name__)
+REACTIVE_ENERGY = "VArh"
+
+
+@dataclass(frozen=True, kw_only=True)
+class LinkyTicSensorConfig(SensorEntityDescription):
+ """Sensor configuration dataclass."""
+
+ fallback_tags: tuple[str, ...] | None = (
+ None # Multiple tags are allowed for non-standard linky tags support, see hekmon/linkytic#42
+ )
+ register_callback: bool = False
+ conversion: Callable | None = None
+
+
+@dataclass(frozen=True, kw_only=True)
+class SerialNumberSensorConfig(LinkyTicSensorConfig):
+ """Sensor configuration dataclass."""
+
+ translation_key: str | None = "serial_number"
+ entity_category: EntityCategory | None = EntityCategory.DIAGNOSTIC
+
+
+# FIXME: Do we really need a date sensor ?
+@dataclass(frozen=True, kw_only=True)
+class DateSensorConfig(LinkyTicSensorConfig):
+ """Config for datetime sensor."""
+
+
+@dataclass(frozen=True, kw_only=True)
+class ApparentPowerSensorConfig(LinkyTicSensorConfig):
+ """Configuration for apparent power sensors."""
+
+ device_class: SensorDeviceClass | None = SensorDeviceClass.APPARENT_POWER
+ native_unit_of_measurement: str | None = UnitOfApparentPower.VOLT_AMPERE
+
+
+@dataclass(frozen=True, kw_only=True)
+class ActivePowerSensorConfig(LinkyTicSensorConfig):
+ """Configuration for active power sensors."""
+
+ device_class: SensorDeviceClass | None = SensorDeviceClass.POWER
+ native_unit_of_measurement: str | None = UnitOfPower.WATT
+
+
+@dataclass(frozen=True, kw_only=True)
+class VoltageSensorConfig(LinkyTicSensorConfig):
+ """Configuration for voltage sensors."""
+
+ device_class: SensorDeviceClass | None = SensorDeviceClass.VOLTAGE
+ native_unit_of_measurement: str | None = UnitOfElectricPotential.VOLT
+
+
+@dataclass(frozen=True, kw_only=True)
+class ElectricalCurrentSensorConfig(LinkyTicSensorConfig):
+ """Configuration for electrical current sensors."""
+
+ device_class: SensorDeviceClass | None = SensorDeviceClass.CURRENT
+ native_unit_of_measurement: str | None = UnitOfElectricCurrent.AMPERE
+
+
+@dataclass(frozen=True, kw_only=True)
+class ActiveEnergySensorConfig(LinkyTicSensorConfig):
+ """Configuration for active energy sensors."""
+
+ device_class: SensorDeviceClass | None = SensorDeviceClass.ENERGY
+ native_unit_of_measurement: str | None = UnitOfEnergy.WATT_HOUR
+ state_class: SensorStateClass | str | None = SensorStateClass.TOTAL_INCREASING
+
+
+@dataclass(frozen=True, kw_only=True)
+class ReactiveEnergySensorConfig(LinkyTicSensorConfig):
+ """Configuration for reactive energy sensors."""
+
+ device_class: SensorDeviceClass | None = SensorDeviceClass.REACTIVE_POWER
+ native_unit_of_measurement: str | None = REACTIVE_ENERGY
+
+
+@dataclass(frozen=True, kw_only=True)
+class StatusRegisterSensorConfig(LinkyTicSensorConfig):
+ """Configuration for status register sensors."""
+
+ key: str = "STGE"
+ status_field: StatusRegister
+
+
+REGISTRY: dict[type[LinkyTicSensorConfig], type[LinkyTICSensor]] = {}
+
+
+def match(*configs: type[LinkyTicSensorConfig]) -> Callable:
+ """Associate one or more sensor config to a sensor class."""
+
+ def wrap(cls: type) -> type:
+ for config in configs:
+ REGISTRY[config] = cls
+ return cls
+
+ return wrap
+
+
+SENSORS_HISTORIC_COMMON: tuple[LinkyTicSensorConfig, ...] = (
+ SerialNumberSensorConfig(key="ADCO"),
+ LinkyTicSensorConfig(
+ key="OPTARIF",
+ translation_key="tarif_option",
+ entity_category=EntityCategory.DIAGNOSTIC,
+ ),
+ ElectricalCurrentSensorConfig(
+ key="ISOUSC",
+ translation_key="subcription_current",
+ entity_category=EntityCategory.DIAGNOSTIC,
+ ),
+ ActiveEnergySensorConfig(
+ key="BASE",
+ translation_key="index_base",
+ ),
+ ActiveEnergySensorConfig(
+ key="HCHC",
+ translation_key="index_hchc",
+ ),
+ ActiveEnergySensorConfig(
+ key="HCHP",
+ translation_key="index_hchp",
+ ),
+ ActiveEnergySensorConfig(
+ key="EJPHN",
+ translation_key="index_ejp_normal",
+ ),
+ ActiveEnergySensorConfig(
+ key="EJPHPM",
+ translation_key="index_ejp_peak",
+ ),
+ ActiveEnergySensorConfig(
+ key="BBRHCJB",
+ translation_key="index_tempo_bluehc",
+ ),
+ ActiveEnergySensorConfig(
+ key="BBRHPJB",
+ translation_key="index_tempo_bluehp",
+ ),
+ ActiveEnergySensorConfig(
+ key="BBRHCJW",
+ translation_key="index_tempo_whitehc",
+ ),
+ ActiveEnergySensorConfig(
+ key="BBRHPJW",
+ translation_key="index_tempo_whitehp",
+ ),
+ ActiveEnergySensorConfig(
+ key="BBRHCJR",
+ translation_key="index_tempo_redhc",
+ ),
+ ActiveEnergySensorConfig(
+ key="BBRHPJR",
+ translation_key="index_tempo_redhp",
+ ),
+ LinkyTicSensorConfig(
+ key="PTEC",
+ translation_key="current_tarif",
+ ),
+ LinkyTicSensorConfig(
+ key="PEJP",
+ translation_key="peak_notice",
+ ),
+ LinkyTicSensorConfig(
+ key="DEMAIN",
+ translation_key="tomorrow_color",
+ ),
+ ApparentPowerSensorConfig(
+ key="PAPP",
+ translation_key="apparent_power",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ ),
+ LinkyTicSensorConfig(
+ key="HHPHC",
+ translation_key="peak_hour_schedule",
+ ),
+ LinkyTicSensorConfig(
+ key="MOTDETAT",
+ translation_key="meter_state",
+ ),
+)
+
+SENSORS_HISTORIC_SINGLEPHASE: tuple[LinkyTicSensorConfig, ...] = (
+ ElectricalCurrentSensorConfig(
+ key="IINST",
+ translation_key="inst_current",
+ state_class=SensorStateClass.MEASUREMENT,
+ ),
+ ElectricalCurrentSensorConfig(
+ key="ADPS",
+ translation_key="overcurrent_warning",
+ state_class=SensorStateClass.MEASUREMENT,
+ ),
+ ElectricalCurrentSensorConfig(
+ key="IMAX",
+ translation_key="max_current",
+ ),
+)
+
+SENSORS_HISTORIC_TREEPHASE: tuple[LinkyTicSensorConfig, ...] = (
+ # IINST1, IINST2, IINST3
+ *(
+ ElectricalCurrentSensorConfig(
+ key=f"IINST{phase}",
+ translation_key=f"inst_current_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ )
+ for phase in (1, 2, 3)
+ ),
+ # ADIR1, ADIR2, ADIR3
+ *(
+ ElectricalCurrentSensorConfig(
+ key=f"ADIR{phase}",
+ translation_key=f"overcurrent_warning_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ )
+ for phase in (1, 2, 3)
+ ),
+ *(
+ ElectricalCurrentSensorConfig(
+ key=f"IMAX{phase}",
+ translation_key=f"max_current_ph{phase}",
+ )
+ for phase in (1, 2, 3)
+ ),
+ ApparentPowerSensorConfig(
+ key="PMAX",
+ translation_key="max_power_n-1",
+ ),
+ LinkyTicSensorConfig(
+ key="PPOT",
+ translation_key="potentials_presence",
+ entity_category=EntityCategory.DIAGNOSTIC,
+ ),
+)
+
+SENSORS_STANDARD_COMMON: tuple[LinkyTicSensorConfig, ...] = (
+ SerialNumberSensorConfig(key="ADSC"),
+ LinkyTicSensorConfig(
+ key="VTIC",
+ translation_key="tic_version",
+ ),
+ DateSensorConfig(
+ key="DATE",
+ translation_key="datetime",
+ ), # Useful in any way?
+ LinkyTicSensorConfig(
+ key="NGTF",
+ translation_key="tarif_name",
+ ),
+ LinkyTicSensorConfig(
+ key="LTARF",
+ translation_key="current_tarif_label",
+ ),
+ ActiveEnergySensorConfig(
+ key="EAST",
+ translation_key="active_energy_drawn_total",
+ ),
+ # EASF01, ... , EASF09
+ *(
+ ActiveEnergySensorConfig(
+ key=f"EASF{index:02}",
+ translation_key=f"active_energy_provider_{index}",
+ )
+ for index in range(1, 10)
+ ),
+ # EASD01, ... , EASD04
+ *(
+ ActiveEnergySensorConfig(
+ key=f"EASD{index:02}",
+ translation_key=f"active_energy_distributor_{index}",
+ )
+ for index in range(1, 5)
+ ),
+ ElectricalCurrentSensorConfig(
+ key="IRMS1",
+ translation_key="rms_current_ph1",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ ),
+ VoltageSensorConfig(
+ key="URMS1",
+ translation_key="rms_voltage_ph1",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ ),
+ ApparentPowerSensorConfig(
+ key="PREF",
+ translation_key="ref_power",
+ entity_category=EntityCategory.DIAGNOSTIC,
+ conversion=(lambda x: x * 1000), # kVA to VA conversion
+ ),
+ ApparentPowerSensorConfig(
+ key="PCOUP",
+ translation_key="trip_power",
+ entity_category=EntityCategory.DIAGNOSTIC,
+ conversion=(lambda x: x * 1000), # kVA to VA conversion
+ ),
+ ApparentPowerSensorConfig(
+ key="SINSTS",
+ translation_key="instantaneous_apparent_power",
+ fallback_tags=("SINST1",), # See hekmon/linkytic#42
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ ),
+ ApparentPowerSensorConfig(
+ key="SMAXSN",
+ translation_key="apparent_power_max_n",
+ fallback_tags=("SMAXN",), # See hekmon/linkytic#42
+ register_callback=True,
+ ),
+ ApparentPowerSensorConfig(
+ key="SMAXSN-1",
+ translation_key="apparent_power_max_n-1",
+ fallback_tags=("SMAXN-1",), # See hekmon/linkytic#42
+ register_callback=True,
+ ),
+ ActivePowerSensorConfig(
+ key="CCASN",
+ translation_key="power_load_curve_n",
+ ),
+ ActivePowerSensorConfig(
+ key="CCASN-1",
+ translation_key="power_load_curve_n-1",
+ ),
+ VoltageSensorConfig(
+ key="UMOY1",
+ translation_key="mean_voltage_ph1",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ ),
+ # DPM1, DPM2, DPM3
+ *(
+ LinkyTicSensorConfig(
+ translation_key=f"mobile_peak_start_{index}",
+ key=f"DPM{index}",
+ )
+ for index in (1, 2, 3)
+ ),
+ # FPM1, FPM2, FPM3
+ *(
+ LinkyTicSensorConfig(
+ translation_key=f"mobile_peak_end_{index}",
+ key=f"FPM{index}",
+ )
+ for index in (1, 2, 3)
+ ),
+ LinkyTicSensorConfig(
+ key="MSG1",
+ translation_key="short_msg",
+ ),
+ LinkyTicSensorConfig(
+ key="MSG2",
+ translation_key="ultra_short_msg",
+ ),
+ LinkyTicSensorConfig(
+ key="PRM",
+ translation_key="delivery_point",
+ ),
+ LinkyTicSensorConfig(
+ key="RELAIS",
+ translation_key="relays",
+ ),
+ LinkyTicSensorConfig(
+ key="NTARF",
+ translation_key="current_tarif_index",
+ ),
+ LinkyTicSensorConfig(
+ key="NJOURF",
+ translation_key="calendar_index_today",
+ ),
+ LinkyTicSensorConfig(
+ key="NJOURF+1",
+ translation_key="calendar_index_tomorrow",
+ ),
+ LinkyTicSensorConfig(
+ key="PJOURF+1",
+ translation_key="calendar_profile_tomorrow",
+ conversion=(lambda x: str(x).replace("NONUTILE", "").strip()),
+ ),
+ LinkyTicSensorConfig(
+ key="PPOINTE",
+ translation_key="next_peak_dat_profile",
+ ),
+ LinkyTicSensorConfig(
+ key="STGE",
+ translation_key="status_register",
+ ), # Duplicate? All fields are exposed as sensors or binary sensors
+ StatusRegisterSensorConfig(
+ translation_key="status_trip_device",
+ status_field=StatusRegister.TRIP_UNIT,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_tarif_provider",
+ status_field=StatusRegister.PROVIDER_INDEX,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_tarif_distributor",
+ status_field=StatusRegister.DISTRIBUTOR_INDEX,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_euridis",
+ status_field=StatusRegister.EURIDIS,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_cpl",
+ status_field=StatusRegister.CPL_STATUS,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_tempo_color_today",
+ status_field=StatusRegister.COLOR_TODAY,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_tempo_color_tomorrow",
+ status_field=StatusRegister.COLOR_NEXT_DAY,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_mobile_peak_notice",
+ status_field=StatusRegister.MOBILE_PEAK_NOTICE,
+ ),
+ StatusRegisterSensorConfig(
+ translation_key="status_mobile_peak", status_field=StatusRegister.MOBILE_PEAK
+ ),
+)
+
+SENSORS_STANDARD_THREEPHASE: tuple[LinkyTicSensorConfig, ...] = (
+ # IRMS2, IRMS3
+ *(
+ ElectricalCurrentSensorConfig(
+ key=f"IRMS{phase}",
+ translation_key=f"rms_current_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ )
+ for phase in (2, 3)
+ ),
+ # URMS2, URMS3
+ *(
+ VoltageSensorConfig(
+ key=f"URMS{phase}",
+ translation_key=f"rms_voltage_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ )
+ for phase in (2, 3)
+ ),
+ # SINSTS1, SINSTS2, SINSTS3
+ *(
+ ApparentPowerSensorConfig(
+ key=f"SINSTS{phase}",
+ translation_key=f"instantaneous_apparent_power_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ )
+ for phase in (1, 2, 3)
+ ),
+ # SMAXSN1, SMAXSN2, SMAXSN3
+ *(
+ ApparentPowerSensorConfig(
+ key=f"SMAXSN{phase}",
+ translation_key=f"apparent_power_max_n_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ )
+ for phase in (1, 2, 3)
+ ),
+ # SMAXSN1-1, SMAXSN2-1, SMAXSN3-1
+ *(
+ ApparentPowerSensorConfig(
+ key=f"SMAXSN{phase}-1",
+ translation_key=f"apparent_power_max_n-1_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ )
+ for phase in (1, 2, 3)
+ ),
+ # UMOY2, UMOY3
+ *(
+ VoltageSensorConfig(
+ key=f"UMOY{phase}",
+ translation_key=f"mean_voltage_ph{phase}",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ )
+ for phase in (2, 3)
+ ),
+)
+
+SENSORS_STANDARD_PRODUCER: tuple[LinkyTicSensorConfig, ...] = (
+ ActiveEnergySensorConfig(
+ key="EAIT",
+ translation_key="active_energy_injected_total",
+ ),
+ # ERQ1, ... , ERQ4
+ *(
+ ReactiveEnergySensorConfig(
+ key=f"ERQ{index}",
+ translation_key=f"reactive_energy_q{index}",
+ )
+ for index in range(1, 5)
+ ),
+ ApparentPowerSensorConfig(
+ key="SINSTI",
+ translation_key="instantaneous_apparent_power_injected",
+ state_class=SensorStateClass.MEASUREMENT,
+ register_callback=True,
+ ),
+ ApparentPowerSensorConfig(
+ key="SMAXIN",
+ translation_key="apparent_power_injected_max_n",
+ register_callback=True,
+ ),
+ ApparentPowerSensorConfig(
+ key="SMAXIN-1",
+ translation_key="apparent_power_injected_max_n-1",
+ register_callback=True,
+ ),
+ ActivePowerSensorConfig(
+ key="CCAIN",
+ translation_key="power_injected_load_curve_n",
+ ),
+ ActivePowerSensorConfig(
+ key="CCAIN-1",
+ translation_key="power_injected_load_curve_n-1",
+ ),
+)
+
# config flow setup
async def async_setup_entry(
hass: HomeAssistant,
- config_entry: ConfigEntry,
+ config_entry: ConfigEntry[LinkyTICReader],
async_add_entities: AddEntitiesCallback,
) -> None:
"""Modern (thru config entry) sensors setup."""
_LOGGER.debug("%s: setting up sensor plateform", config_entry.title)
# Retrieve the serial reader object
- try:
- serial_reader: LinkyTICReader = hass.data[DOMAIN][config_entry.entry_id]
- except KeyError:
- _LOGGER.error(
- "%s: can not init sensors: failed to get the serial reader object",
- config_entry.title,
- )
- return
+ reader = config_entry.runtime_data
- # Flag for experimental counters which have slightly different tags.
- is_pilot: bool = (
- serial_reader.device_identification[DID_TYPE_CODE] in EXPERIMENTAL_DEVICES
- )
+ is_standard = bool(config_entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD)
+ is_threephase = bool(config_entry.data.get(SETUP_THREEPHASE))
+ is_producer = bool(config_entry.data.get(SETUP_PRODUCER))
- # Init sensors
- sensors: Iterable[Entity]
- if config_entry.data.get(SETUP_TICMODE) == TICMODE_STANDARD:
- # standard mode
- sensors = [
- ADSSensor(
- config_title=config_entry.title,
- tag="ADSC",
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- LinkyTICStringSensor(
- tag="VTIC",
- name="Version de la TIC",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:tag",
- ),
- DateEtHeureSensor(
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- LinkyTICStringSensor(
- tag="NGTF",
- name="Nom du calendrier tarifaire fournisseur",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:cash-check",
- ),
- LinkyTICStringSensor(
- tag="LTARF",
- name="Libellé tarif fournisseur en cours",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:cash-check",
- ),
- EnergyIndexSensor(
- tag="EAST",
- name="Energie active soutirée totale",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF01",
- name="Energie active soutirée fournisseur, index 01",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF02",
- name="Energie active soutirée fournisseur, index 02",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF03",
- name="Energie active soutirée fournisseur, index 03",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF04",
- name="Energie active soutirée fournisseur, index 04",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF05",
- name="Energie active soutirée fournisseur, index 05",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF06",
- name="Energie active soutirée fournisseur, index 06",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF07",
- name="Energie active soutirée fournisseur, index 07",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF08",
- name="Energie active soutirée fournisseur, index 08",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASF09",
- name="Energie active soutirée fournisseur, index 09",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASD01",
- name="Energie active soutirée distributeur, index 01",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASD02",
- name="Energie active soutirée distributeur, index 02",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASD03",
- name="Energie active soutirée distributeur, index 03",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EASD04",
- name="Energie active soutirée distributeur, index 04",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- CurrentSensor(
- tag="IRMS1",
- name="Courant efficace, phase 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- ),
- VoltageSensor(
- tag="URMS1",
- name="Tension efficace, phase 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- ),
- ApparentPowerSensor(
- tag="PREF",
- name="Puissance app. de référence",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- category=EntityCategory.DIAGNOSTIC,
- conversion_function=(lambda x: x * 1000), # kVA conversion
- ),
- ApparentPowerSensor(
- tag="PCOUP",
- name="Puissance app. de coupure",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- category=EntityCategory.DIAGNOSTIC,
- conversion_function=(lambda x: x * 1000), # kVA conversion
- ),
- ApparentPowerSensor(
- tag="SINST1" if is_pilot else "SINSTS",
- name="Puissance app. instantanée soutirée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- ),
- ApparentPowerSensor(
- tag="SMAXN" if is_pilot else "SMAXSN",
- name="Puissance app. max. soutirée n",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- ),
- ApparentPowerSensor(
- tag="SMAXN-1" if is_pilot else "SMAXSN-1",
- name="Puissance app. max. soutirée n-1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- ),
- PowerSensor(
- tag="CCASN",
- name="Point n de la courbe de charge active soutirée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- PowerSensor(
- tag="CCASN-1",
- name="Point n-1 de la courbe de charge active soutirée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- VoltageSensor(
- tag="UMOY1",
- name="Tension moy. ph. 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT, # Should this be considered an instantaneous value?
- register_callback=True,
- ),
- LinkyTICStringSensor(
- tag="DPM1",
- name="Début pointe mobile 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-start",
- ),
- LinkyTICStringSensor(
- tag="FPM1",
- name="Fin pointe mobile 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-end",
- ),
- LinkyTICStringSensor(
- tag="DPM2",
- name="Début pointe mobile 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-start",
- ),
- LinkyTICStringSensor(
- tag="FPM2",
- name="Fin pointe mobile 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-end",
- ),
- LinkyTICStringSensor(
- tag="DPM3",
- name="Début pointe mobile 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-start",
- ),
- LinkyTICStringSensor(
- tag="FPM3",
- name="Fin pointe mobile 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-end",
- ),
- LinkyTICStringSensor(
- tag="MSG1",
- name="Message court",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:message-text-outline",
- ),
- LinkyTICStringSensor(
- tag="MSG2",
- name="Message Ultra court",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:message-text-outline",
- ),
- LinkyTICStringSensor(
- tag="PRM",
- name="PRM",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:tag",
- ),
- LinkyTICStringSensor(
- tag="RELAIS",
- name="Relais",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:electric-switch",
- ),
- LinkyTICStringSensor(
- tag="NTARF",
- name="Numéro de l’index tarifaire en cours",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:cash-check",
- ),
- LinkyTICStringSensor(
- tag="NJOURF",
- name="Numéro du jour en cours calendrier fournisseur",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:calendar-month-outline",
- ),
- LinkyTICStringSensor(
- tag="NJOURF+1",
- name="Numéro du prochain jour calendrier fournisseur",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:calendar-month-outline",
- ),
- ProfilDuProchainJourCalendrierFournisseurSensor(
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- LinkyTICStringSensor(
- tag="PPOINTE",
- name="Profil du prochain jour de pointe",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:calendar-month-outline",
- ),
- LinkyTICStringSensor(
- tag="STGE",
- name="Registre de statuts", # codespell:ignore
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:list-status",
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut organe de coupure",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:connection",
- field=StatusRegister.ORGANE_DE_COUPURE,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut tarif contrat fourniture",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:cash-check",
- field=StatusRegister.TARIF_CONTRAT_FOURNITURE,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut tarif contrat distributeur",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:cash-check",
- field=StatusRegister.TARIF_CONTRAT_DISTRIBUTEUR,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut sortie communication Euridis",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:tag",
- field=StatusRegister.ETAT_SORTIE_COMMUNICATION_EURIDIS,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut CPL",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:tag",
- field=StatusRegister.STATUS_CPL,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut couleur du jour tempo",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:palette",
- field=StatusRegister.COULEUR_JOUR_CONTRAT_TEMPO,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut couleur du lendemain tempo",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:palette",
- field=StatusRegister.COULEUR_LENDEMAIN_CONTRAT_TEMPO,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut préavis pointes mobiles", # codespell:ignore
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-alert-outline",
- field=StatusRegister.PREAVIS_POINTES_MOBILES,
- ),
- LinkyTICStatusRegisterSensor(
- name="Statut pointe mobile",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:progress-clock",
- field=StatusRegister.POINTE_MOBILE,
- ),
- ]
- # Add producer specific sensors
- if bool(config_entry.data.get(SETUP_PRODUCER)):
- sensors.append(
- EnergyIndexSensor(
- tag="EAIT",
- name="Energie active injectée totale",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- EnergyIndexSensor(
- tag="ERQ1",
- name="Energie réactive Q1 totale",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- EnergyIndexSensor(
- tag="ERQ2",
- name="Energie réactive Q2 totale",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- EnergyIndexSensor(
- tag="ERQ3",
- name="Energie réactive Q3 totale",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- EnergyIndexSensor(
- tag="ERQ4",
- name="Energie réactive Q4 totale",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SINSTI",
- name="Puissance app. instantanée injectée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXIN",
- name="Puissance app. max. injectée n",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXIN-1",
- name="Puissance app. max. injectée n-1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- PowerSensor(
- tag="CCAIN",
- name="Point n de la courbe de charge active injectée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:transmission-tower-import",
- )
- )
- sensors.append(
- PowerSensor(
- tag="CCAIN-1",
- name="Point n-1 de la courbe de charge active injectée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:transmission-tower-import",
- )
- )
- # Add three-phase specific sensors
- if bool(config_entry.data.get(SETUP_THREEPHASE)):
- sensors.append(
- CurrentSensor(
- tag="IRMS2",
- name="Courant efficace, phase 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="IRMS3",
- name="Courant efficace, phase 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- VoltageSensor(
- tag="URMS2",
- name="Tension efficace, phase 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- VoltageSensor(
- tag="URMS3",
- name="Tension efficace, phase 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SINSTS1",
- name="Puissance app. instantanée soutirée phase 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SINSTS2",
- name="Puissance app. instantanée soutirée phase 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SINSTS3",
- name="Puissance app. instantanée soutirée phase 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXSN1",
- name="Puissance app max. soutirée n phase 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXSN2",
- name="Puissance app max. soutirée n phase 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXSN3",
- name="Puissance app max. soutirée n phase 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXSN1-1",
- name="Puissance app max. soutirée n-1 phase 1",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXSN2-1",
- name="Puissance app max. soutirée n-1 phase 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- ApparentPowerSensor(
- tag="SMAXSN3-1",
- name="Puissance app max. soutirée n-1 phase 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- VoltageSensor(
- tag="UMOY2",
- name="Tension moy. ph. 2",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- VoltageSensor(
- tag="UMOY3",
- name="Tension moy. ph. 3",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
+ def sensor_to_create() -> Generator[LinkyTicSensorConfig]:
+ if is_standard:
+ # Standard mode
+ for desc in SENSORS_STANDARD_COMMON:
+ yield desc
+
+ if is_threephase:
+ for desc in SENSORS_STANDARD_THREEPHASE:
+ yield desc
+
+ if is_producer:
+ for desc in SENSORS_STANDARD_PRODUCER:
+ yield desc
- else:
- # historic mode
- sensors = [
- ADSSensor(
- config_title=config_entry.title,
- tag="ADCO",
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- LinkyTICStringSensor(
- tag="OPTARIF",
- name="Option tarifaire choisie",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:cash-check",
- category=EntityCategory.DIAGNOSTIC,
- ),
- CurrentSensor(
- tag="ISOUSC",
- name="Intensité souscrite",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- category=EntityCategory.DIAGNOSTIC,
- ),
- EnergyIndexSensor(
- tag="BASE",
- name="Index option Base",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="HCHC",
- name="Index option Heures Creuses - Heures Creuses",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="HCHP",
- name="Index option Heures Creuses - Heures Pleines",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EJPHN",
- name="Index option EJP - Heures Normales", # codespell:ignore
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="EJPHPM",
- name="Index option EJP - Heures de Pointe Mobile",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="BBRHCJB",
- name="Index option Tempo - Heures Creuses Jours Bleus",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="BBRHPJB",
- name="Index option Tempo - Heures Pleines Jours Bleus",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="BBRHCJW",
- name="Index option Tempo - Heures Creuses Jours Blancs",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="BBRHPJW",
- name="Index option Tempo - Heures Pleines Jours Blancs",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="BBRHCJR",
- name="Index option Tempo - Heures Creuses Jours Rouges",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- EnergyIndexSensor(
- tag="BBRHPJR",
- name="Index option Tempo - Heures Pleines Jours Rouges",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- PEJPSensor(
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- ),
- LinkyTICStringSensor(
- tag="PTEC",
- name="Période Tarifaire en cours",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:calendar-expand-horizontal",
- ),
- LinkyTICStringSensor(
- tag="DEMAIN",
- name="Couleur du lendemain",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:palette",
- ),
- ApparentPowerSensor(
- tag="PAPP",
- name="Puissance apparente",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- ),
- LinkyTICStringSensor(
- tag="HHPHC",
- name="Horaire Heures Pleines Heures Creuses",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:clock-outline",
- enabled_by_default=False,
- ),
- LinkyTICStringSensor(
- tag="MOTDETAT",
- name="Mot d'état du compteur", # codespell:ignore
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- icon="mdi:file-word-box-outline",
- category=EntityCategory.DIAGNOSTIC,
- enabled_by_default=False,
- ),
- ]
- # Add specific sensors
- if bool(config_entry.data.get(SETUP_THREEPHASE)):
- # three-phase - concat specific sensors
- sensors.append(
- CurrentSensor(
- tag="IINST1",
- name="Intensité Instantanée (phase 1)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="IINST2",
- name="Intensité Instantanée (phase 2)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="IINST3",
- name="Intensité Instantanée (phase 3)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="IMAX1",
- name="Intensité maximale appelée (phase 1)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="IMAX2",
- name="Intensité maximale appelée (phase 2)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="IMAX3",
- name="Intensité maximale appelée (phase 3)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- )
- )
- sensors.append(
- PowerSensor( # documentation says unit is Watt but description talks about VoltAmp :/
- tag="PMAX",
- name="Puissance maximale triphasée atteinte (jour n-1)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- )
- )
- sensors.append(
- LinkyTICStringSensor(
- tag="PPOT",
- name="Présence des potentiels", # codespell:ignore
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- category=EntityCategory.DIAGNOSTIC,
- )
- )
- # Burst sensors
- sensors.append(
- CurrentSensor(
- tag="ADIR1",
- name="Avertissement de Dépassement d'intensité de réglage (phase 1)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="ADIR2",
- name="Avertissement de Dépassement d'intensité de réglage (phase 2)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="ADIR3",
- name="Avertissement de Dépassement d'intensité de réglage (phase 3)",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- register_callback=True,
- )
- )
- _LOGGER.info(
- "Adding %d sensors for the three phase historic mode", len(sensors)
- )
else:
- # single phase - concat specific sensors
- sensors.append(
- CurrentSensor(
- tag="IINST",
- name="Intensité Instantanée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="ADPS",
- name="Avertissement de Dépassement De Puissance Souscrite",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- state_class=SensorStateClass.MEASUREMENT,
- register_callback=True,
- )
- )
- sensors.append(
- CurrentSensor(
- tag="IMAX",
- name="Intensité maximale appelée",
- config_title=config_entry.title,
- config_uniq_id=config_entry.entry_id,
- serial_reader=serial_reader,
- )
- )
- _LOGGER.info(
- "Adding %d sensors for the single phase historic mode", len(sensors)
- )
- # Add the entities to HA
- if len(sensors) > 0:
- async_add_entities(sensors, True)
+ # Historic mode
+ for desc in SENSORS_HISTORIC_COMMON:
+ yield desc
+
+ for desc in (
+ SENSORS_HISTORIC_TREEPHASE
+ if is_threephase
+ else SENSORS_HISTORIC_SINGLEPHASE
+ ):
+ yield desc
+
+ async_add_entities(
+ (
+ REGISTRY[type(descriptor)](descriptor, reader)
+ for descriptor in sensor_to_create()
+ ),
+ update_before_add=True,
+ )
T = TypeVar("T")
@@ -1073,30 +628,49 @@ class LinkyTICSensor(LinkyTICEntity, SensorEntity, Generic[T]):
_attr_should_poll = True
_last_value: T | None
+ entity_description: LinkyTicSensorConfig
- def __init__(self, tag: str, config_title: str, reader: LinkyTICReader) -> None:
+ def __init__(
+ self,
+ description: LinkyTicSensorConfig,
+ reader: LinkyTICReader,
+ ) -> None:
"""Init sensor entity."""
super().__init__(reader)
+
+ self.entity_description = description
self._last_value = None
- self._tag = tag
- self._config_title = config_title
+ self._tag = description.key
+
+ self._attr_unique_id = slugify(f"{reader.serial_number}_{description.key}")
@property
- def native_value(self) -> T | None: # type:ignore
+ def native_value(self) -> T | None: # type:ignore[override]
"""Value of the sensor."""
return self._last_value
- def _update(self) -> tuple[Optional[str], Optional[str]]:
+ def _update(self) -> tuple[str | None, str | None]:
"""Get value and/or timestamp from cached data. Responsible for updating sensor availability."""
value, timestamp = self._serial_controller.get_values(self._tag)
_LOGGER.debug(
"%s: retrieved %s value from serial controller: (%s, %s)",
- self._config_title,
+ self._serial_controller.name,
self._tag,
value,
timestamp,
)
+ if (
+ not value
+ and not timestamp
+ and self.entity_description.fallback_tags is not None
+ ):
+ # Fallback to other tags, if any
+ for tag in self.entity_description.fallback_tags:
+ value, timestamp = self._serial_controller.get_values(tag)
+ if value or timestamp:
+ break
+
if not value and not timestamp: # No data returned.
if not self.available:
# Sensor is already unavailable, no need to check why.
@@ -1104,14 +678,14 @@ def _update(self) -> tuple[Optional[str], Optional[str]]:
if not self._serial_controller.is_connected:
_LOGGER.debug(
"%s: marking the %s sensor as unavailable: serial connection lost",
- self._config_title,
+ self._serial_controller.name,
self._tag,
)
self._attr_available = False
elif self._serial_controller.has_read_full_frame:
_LOGGER.info(
"%s: marking the %s sensor as unavailable: a full frame has been read but %s has not been found",
- self._config_title,
+ self._serial_controller.name,
self._tag,
self._tag,
)
@@ -1127,37 +701,29 @@ def _update(self) -> tuple[Optional[str], Optional[str]]:
self._attr_available = True
_LOGGER.info(
"%s: marking the %s sensor as available now !",
- self._config_title,
+ self._serial_controller.name,
self._tag,
)
return value, timestamp
+@match(SerialNumberSensorConfig)
class ADSSensor(LinkyTICSensor[str]):
"""Adresse du compteur entity.""" # codespell:ignore
- # ADSSensor is a subclass and not an instance of StringSensor because it binds to two tags.
-
- # Generic properties
- # https://developers.home-assistant.io/docs/core/entity#generic-properties
-
- _attr_entity_category = EntityCategory.DIAGNOSTIC
- _attr_name = "Adresse du compteur" # codespell:ignore
- _attr_icon = "mdi:tag"
+ entity_description: SerialNumberSensorConfig
def __init__(
self,
- config_title: str,
- tag: str,
- config_uniq_id: str,
- serial_reader: LinkyTICReader,
+ description: SerialNumberSensorConfig,
+ reader: LinkyTICReader,
) -> None:
"""Initialize an ADCO/ADSC Sensor."""
- _LOGGER.debug("%s: initializing %s sensor", config_title, tag)
- super().__init__(tag, config_title, serial_reader)
- # Generic entity properties
- self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_adco"
+ super().__init__(description, reader)
+
+ # Overwrite tag-based unique id for compatibility between tic versions
+ self._attr_unique_id = slugify(f"{reader.serial_number}_adco")
self._extra: dict[str, str] = {}
@property
@@ -1166,7 +732,7 @@ def extra_state_attributes(self) -> dict[str, str]:
return self._extra
@callback
- def update(self):
+ def update(self) -> None:
"""Update the value of the sensor from the thread object memory cache."""
# Get last seen value from controller
value, _ = self._update()
@@ -1186,90 +752,66 @@ def update(self):
self._last_value = value
+@match(LinkyTicSensorConfig)
class LinkyTICStringSensor(LinkyTICSensor[str]):
"""Common class for text sensor."""
_attr_entity_category = EntityCategory.DIAGNOSTIC
- def __init__(
- self,
- tag: str,
- name: str,
- config_title: str,
- config_uniq_id: str,
- serial_reader: LinkyTICReader,
- icon: str | None = None,
- category: EntityCategory | None = None,
- enabled_by_default: bool = True,
- ) -> None:
- """Initialize a Regular Str Sensor."""
- _LOGGER.debug("%s: initializing %s sensor", config_title, tag.upper())
- super().__init__(tag, config_title, serial_reader)
-
- # Generic Entity properties
- self._attr_name = name
- self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{tag.lower()}"
- if icon:
- self._attr_icon = icon
- if category:
- self._attr_entity_category = category
- self._attr_entity_registry_enabled_default = enabled_by_default
-
@callback
- def update(self):
+ def update(self) -> None:
"""Update the value of the sensor from the thread object memory cache."""
# Get last seen value from controller
value, _ = self._update()
if not value:
return
- self._last_value = " ".join(value.split())
+ conversion = self.entity_description.conversion
-class RegularIntSensor(LinkyTICSensor[int]):
- """Common class for int sensors."""
+ if conversion is not None:
+ try:
+ self._last_value = conversion(value)
+ except TypeError as e:
+ _LOGGER.debug("Couldn't convert value %s: %s", value, e)
+ self._last_value = value
+ else:
+ self._last_value = " ".join(value.split())
- def __init__(
- self,
- tag: str,
- name: str,
- config_title: str,
- config_uniq_id: str,
- serial_reader: LinkyTICReader,
- icon: str | None = None,
- category: EntityCategory | None = None,
- device_class: SensorDeviceClass | None = None,
- native_unit_of_measurement: str | None = None,
- state_class: SensorStateClass | None = None,
- register_callback: bool = False,
- conversion_function: Callable[[int], int] | None = None,
- ) -> None:
- """Initialize a Regular Int Sensor."""
- _LOGGER.debug("%s: initializing %s sensor", config_title, tag.upper())
- super().__init__(tag, config_title, serial_reader)
- self._attr_name = name
-
- if register_callback:
- self._serial_controller.register_push_notif(
- self._tag, self.update_notification
+
+# FIXME: Do we really need a date sensor ?
+@match(DateSensorConfig)
+class DateSensor(LinkyTICStringSensor):
+ """Linky internal date sensor."""
+
+ @callback
+ def update(self) -> None:
+ """Update the value of the sensor from the thread object memory cache."""
+ _, timestamp = self._update()
+ if not timestamp:
+ return
+ raw = timestamp
+
+ with contextlib.suppress(IndexError):
+ self._last_value = (
+ f"{raw[5:7]}/{raw[3:5]}/{raw[1:3]} "
+ f"{raw[7:9]}:{raw[9:11]} "
+ f"({'Eté' if raw[0] == 'E' else 'Hiver'})"
)
- # Generic Entity properties
- if category:
- self._attr_entity_category = category
- self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{tag.lower()}"
- if icon:
- self._attr_icon = icon
- # Sensor Entity Properties
- if device_class:
- self._attr_device_class = device_class
- if native_unit_of_measurement:
- self._attr_native_unit_of_measurement = native_unit_of_measurement
- if state_class:
- self._attr_state_class = state_class
-
- self._conversion_function = conversion_function
+
+
+@match(
+ ApparentPowerSensorConfig,
+ ActiveEnergySensorConfig,
+ ReactiveEnergySensorConfig,
+ ElectricalCurrentSensorConfig,
+ VoltageSensorConfig,
+ ActivePowerSensorConfig,
+)
+class RegularIntSensor(LinkyTICSensor[int]):
+ """Common class for int sensors."""
@callback
- def update(self):
+ def update(self) -> None:
"""Update the value of the sensor from the thread object memory cache."""
value, _ = self._update()
if not value:
@@ -1278,11 +820,17 @@ def update(self):
value_int = int(value)
except ValueError:
return
- self._last_value = (
- self._conversion_function(value_int)
- if self._conversion_function
- else value_int
- )
+
+ conversion = self.entity_description.conversion
+
+ if conversion is not None:
+ try:
+ self._last_value = conversion(value_int)
+ except TypeError as e:
+ _LOGGER.debug("Couldn't convert value %s: %s", value_int, e)
+ self._last_value = value_int
+ else:
+ self._last_value = value_int
def update_notification(self, realtime_option: bool) -> None:
"""Receive a notification from the serial reader when our tag has been read on the wire."""
@@ -1307,188 +855,32 @@ def update_notification(self, realtime_option: bool) -> None:
self.schedule_update_ha_state(force_refresh=True)
-class EnergyIndexSensor(RegularIntSensor):
- """Common class for energy index counters, in Watt-hours."""
-
- _attr_device_class = SensorDeviceClass.ENERGY
- _attr_native_unit_of_measurement = UnitOfEnergy.WATT_HOUR
- _attr_state_class = SensorStateClass.TOTAL_INCREASING
-
-
-class VoltageSensor(RegularIntSensor):
- """Common class for voltage sensors, in Volts."""
-
- _attr_device_class = SensorDeviceClass.VOLTAGE
- _attr_native_unit_of_measurement = UnitOfElectricPotential.VOLT
-
-
-class CurrentSensor(RegularIntSensor):
- """Common class for electric current sensors, in Amperes."""
-
- _attr_device_class = SensorDeviceClass.CURRENT
- _attr_native_unit_of_measurement = UnitOfElectricCurrent.AMPERE
-
-
-class PowerSensor(RegularIntSensor):
- """Common class for real power sensors, in Watts."""
-
- _attr_device_class = SensorDeviceClass.POWER
- _attr_native_unit_of_measurement = UnitOfPower.WATT
-
-
-class ApparentPowerSensor(RegularIntSensor):
- """Common class for apparent power sensors, in Volt-Amperes."""
-
- _attr_device_class = SensorDeviceClass.APPARENT_POWER
- _attr_native_unit_of_measurement = UnitOfApparentPower.VOLT_AMPERE
-
-
-class PEJPSensor(LinkyTICStringSensor):
- """Préavis Début EJP (30 min) sensor."""
-
- #
- # This sensor could be improved I think (minutes as integer), but I do not have it to check and test its values
- # Leaving it as it is to facilitate future modifications
- #
- _attr_icon = "mdi:clock-start"
-
- def __init__(
- self, config_title: str, config_uniq_id: str, serial_reader: LinkyTICReader
- ) -> None:
- """Initialize a PEJP sensor."""
- _LOGGER.debug("%s: initializing PEJP sensor", config_title)
- super().__init__(
- tag="PEJP",
- name="Préavis Début EJP",
- config_title=config_title,
- config_uniq_id=config_uniq_id,
- serial_reader=serial_reader,
- )
-
- self._attr_unique_id = f"{DOMAIN}_{config_uniq_id}_{self._tag.lower()}"
-
-
-class DateEtHeureSensor(LinkyTICStringSensor):
- """Date et heure courante sensor."""
-
- _attr_icon = "mdi:clock-outline"
-
- def __init__(
- self,
- config_title: str,
- config_uniq_id: str,
- serial_reader: LinkyTICReader,
- ) -> None:
- """Initialize a Date et heure sensor."""
- _LOGGER.debug("%s: initializing Date et heure courante sensor", config_title)
- super().__init__(
- tag="DATE",
- name="Date et heure courante",
- config_title=config_title,
- config_uniq_id=config_uniq_id,
- serial_reader=serial_reader,
- )
-
- @callback
- def update(self):
- """Update the value of the sensor from the thread object memory cache."""
- # Get last seen value from controller
- _, timestamp = self._update()
-
- if not timestamp:
- return
- # Save value
- saison = ""
- try:
- if timestamp[0:1] == "E":
- saison = " (Eté)"
- elif timestamp[0:1] == "H":
- saison = " (Hiver)"
- self._last_value = (
- timestamp[5:7]
- + "/"
- + timestamp[3:5]
- + "/"
- + timestamp[1:3]
- + " "
- + timestamp[7:9]
- + ":"
- + timestamp[9:11]
- + saison
- )
- except IndexError:
- return
-
-
-class ProfilDuProchainJourCalendrierFournisseurSensor(LinkyTICStringSensor):
- """Profil du prochain jour du calendrier fournisseur sensor."""
-
- _attr_icon = "mdi:calendar-month-outline"
-
- def __init__(
- self,
- config_title: str,
- config_uniq_id: str,
- serial_reader: LinkyTICReader,
- category: EntityCategory | None = None,
- ) -> None:
- """Initialize a Profil du prochain jour du calendrier fournisseur sensor."""
- _LOGGER.debug("%s: initializing Date et heure courante sensor", config_title)
- super().__init__(
- tag="PJOURF+1",
- name="Profil du prochain jour calendrier fournisseur",
- config_title=config_title,
- config_uniq_id=config_uniq_id,
- serial_reader=serial_reader,
- )
-
- @callback
- def update(self):
- """Update the value of the sensor from the thread object memory cache."""
- # Get last seen value from controller
- value, _ = self._update()
- if not value:
- return
- self._last_value = value.replace("NONUTILE", "").strip()
-
-
+@match(StatusRegisterSensorConfig)
class LinkyTICStatusRegisterSensor(LinkyTICStringSensor):
"""Data from status register."""
- _attr_has_entity_name = True
- _attr_should_poll = True
_attr_device_class = SensorDeviceClass.ENUM
def __init__(
self,
- name: str,
- config_title: str,
- config_uniq_id: str,
- serial_reader: LinkyTICReader,
- field: StatusRegister,
- enabled_by_default: bool = True,
- icon: str | None = None,
+ description: StatusRegisterSensorConfig,
+ reader: LinkyTICReader,
) -> None:
"""Initialize a status register data sensor."""
- _LOGGER.debug("%s: initializing a status register data sensor", config_title)
- self._field = field
- super().__init__(
- tag="STGE",
- name=name,
- config_title=config_title,
- config_uniq_id=config_uniq_id,
- serial_reader=serial_reader,
- icon=icon,
- enabled_by_default=enabled_by_default,
- )
- self._attr_unique_id = (
- f"{DOMAIN}_{config_uniq_id}_{field.name.lower()}" # Breaking changes here.
+ super().__init__(description, reader)
+ status_field = description.status_field
+ self._field = status_field
+
+ self._attr_unique_id = slugify(
+ f"{reader.serial_number}_{description.status_field.name}"
)
# For SensorDeviceClass.ENUM, _attr_options contains all the possible values for the sensor.
- self._attr_options = list(cast(dict[int, str], field.value.options).values())
+ self._attr_options = list(
+ cast(dict[int, str], status_field.value.options).values()
+ )
@callback
- def update(self):
+ def update(self) -> None:
"""Update the value of the sensor from the thread object memory cache."""
# Get last seen value from controller
value, _ = self._update()
@@ -1496,7 +888,5 @@ def update(self):
if not value:
return
- try:
+ with contextlib.suppress(IndexError):
self._last_value = cast(str, self._field.value.get_status(value))
- except IndexError:
- pass # Failsafe, value is unchanged.
diff --git a/custom_components/linkytic/serial_reader.py b/custom_components/linkytic/serial_reader.py
index ba4bf3c..97bab7d 100644
--- a/custom_components/linkytic/serial_reader.py
+++ b/custom_components/linkytic/serial_reader.py
@@ -6,10 +6,11 @@
import threading
import time
from collections.abc import Callable
+from typing import cast
import serial
import serial.serialutil
-from homeassistant.core import callback
+from homeassistant.core import Event, callback
from .const import (
BYTESIZE,
@@ -43,10 +44,10 @@ class LinkyTICReader(threading.Thread):
def __init__(
self,
title: str,
- port,
- std_mode,
- producer_mode,
- three_phase,
+ port: str,
+ std_mode: bool,
+ producer_mode: bool,
+ three_phase: bool,
real_time: bool | None = False,
) -> None:
"""Init the LinkyTIC thread serial reader.""" # Thread
@@ -83,7 +84,7 @@ def __init__(
self._serial_number = None
super().__init__(name=f"LinkyTIC for {title}")
- def get_values(self, tag) -> tuple[str | None, str | None]:
+ def get_values(self, tag: str) -> tuple[str | None, str | None]:
"""Get tag value and timestamp from the thread memory cache."""
if not self.is_connected:
return None, None
@@ -103,7 +104,7 @@ def is_connected(self) -> bool:
"""Use to know if the reader is actually connected to a serial connection."""
if self._reader is None:
return False
- return self._reader.is_open
+ return cast(bool, self._reader.is_open)
@property
def serial_number(self) -> str | None:
@@ -120,7 +121,7 @@ def setup_error(self) -> BaseException | None:
"""If the reader thread terminates due to a serial exception, this property will contain the raised exception."""
return self._setup_error
- def run(self):
+ def run(self) -> None:
"""Continuously read the the serial connection and extract TIC values."""
if not self._open_serial():
@@ -205,13 +206,15 @@ def run(self):
if self._reader:
self._reader.close()
- def register_push_notif(self, tag: str, notif_callback: Callable[[bool], None]):
+ def register_push_notif(
+ self, tag: str, notif_callback: Callable[[bool], None]
+ ) -> None:
"""Call to register a callback notification when a certain tag is parsed."""
_LOGGER.debug("Registering a callback for %s tag", tag)
self._notif_callbacks[tag] = notif_callback
@callback
- def signalstop(self, event):
+ def signalstop(self, event: Event | str) -> None:
"""Activate the stop flag in order to stop the thread from within."""
if self.is_alive():
_LOGGER.info(
@@ -219,12 +222,12 @@ def signalstop(self, event):
)
self._stopsignal = True
- def update_options(self, real_time: bool):
+ def update_options(self, real_time: bool) -> None:
"""Setter to update serial reader options."""
_LOGGER.debug("%s: new real time option value: %s", self._title, real_time)
self._realtime = real_time
- def _cleanup_cache(self):
+ def _cleanup_cache(self) -> None:
"""Call to cleanup the data cache to allow some sensors to get back to undefined/unavailable if they are not present in the last frame."""
for cached_tag in list(self._values.keys()): # pylint: disable=consider-using-dict-items,consider-iterating-dictionary
if cached_tag not in self._tags_seen:
@@ -264,7 +267,7 @@ def _open_serial(self) -> bool:
_LOGGER.info("Serial connection is now open at %s", self._port)
return True
- def _reset_state(self):
+ def _reset_state(self) -> None:
"""Reinitialize the controller (by nullifying it) and wait 5s for other methods to re start init after a pause."""
_LOGGER.debug("Resetting serial reader state and wait 10s")
self._values = {}
@@ -284,7 +287,7 @@ def _reset_state(self):
DID_YEAR: None,
}
- def _parse_line(self, line) -> str | None:
+ def _parse_line(self, line: bytes) -> str | None:
"""Parse a line when a full line has been read from serial. It parses it as Linky TIC infos, validate its checksum and save internally the line infos."""
# there is a great chance that the first line is a partial line: skip it
if self._first_line:
@@ -352,17 +355,19 @@ def _parse_line(self, line) -> str | None:
# transform and store the values
payload: dict[str, str | None] = {"value": field_value.decode("ascii")}
payload["timestamp"] = timestamp.decode("ascii") if timestamp else None
- tag = tag.decode("ascii")
- self._values[tag] = payload
- _LOGGER.debug("read the following values: %s -> %s", tag, repr(payload))
+ tag_str = tag.decode("ascii")
+ self._values[tag_str] = payload
+ _LOGGER.debug("read the following values: %s -> %s", tag_str, repr(payload))
# Parse ADS for device identification if necessary
- if (self._std_mode and tag == "ADSC") or (not self._std_mode and tag == "ADCO"):
+ if (self._std_mode and tag_str == "ADSC") or (
+ not self._std_mode and tag_str == "ADCO"
+ ):
self.parse_ads(payload["value"])
- return tag
+ return tag_str
def _validate_checksum(
self, tag: bytes, timestamp: bytes | None, value: bytes, checksum: bytes
- ):
+ ) -> None:
# rebuild the frame
if self._std_mode:
sep = MODE_STANDARD_FIELD_SEPARATOR
@@ -402,18 +407,18 @@ def _validate_checksum(
), # fake expected checksum to avoid type error on ord()
) from exc
- def parse_ads(self, ads):
+ def parse_ads(self, ads: str | None) -> None:
"""Extract information contained in the ADS as EURIDIS."""
_LOGGER.debug(
"%s: parsing ADS: %s",
self._title,
ads,
)
- if len(ads) != 12:
+ if ads is None or len(ads) != 12:
_LOGGER.error(
"%s: ADS should be 12 char long, actually %d cannot parse: %s",
self._title,
- len(ads),
+ len(ads or ""),
ads,
)
return
@@ -423,16 +428,21 @@ def parse_ads(self, ads):
return
# Save serial number
- self._serial_number = ads
+ self._serial_number = ads # type: ignore[assignment] # mypy complains because we checked prior that self._serial_number is None
# let's parse ADS as EURIDIS
- device_identification = {DID_YEAR: ads[2:4], DID_REGNUMBER: ads[6:]}
+ device_identification: dict[str, str | None] = {
+ DID_YEAR: ads[2:4],
+ DID_REGNUMBER: ads[6:],
+ }
+ const_code = ads[0:2]
+ type_code = ads[4:6]
+
# # Parse constructor code
- device_identification[DID_CONSTRUCTOR_CODE] = ads[0:2]
+
+ device_identification[DID_CONSTRUCTOR_CODE] = const_code
try:
- device_identification[DID_CONSTRUCTOR] = CONSTRUCTORS_CODES[
- device_identification[DID_CONSTRUCTOR_CODE]
- ]
+ device_identification[DID_CONSTRUCTOR] = CONSTRUCTORS_CODES[const_code]
except KeyError:
_LOGGER.warning(
"%s: constructor code is unknown: %s",
@@ -441,11 +451,9 @@ def parse_ads(self, ads):
)
device_identification[DID_CONSTRUCTOR] = None
# # Parse device type code
- device_identification[DID_TYPE_CODE] = ads[4:6]
+ device_identification[DID_TYPE_CODE] = type_code
try:
- device_identification[DID_TYPE] = (
- f"{DEVICE_TYPES[device_identification[DID_TYPE_CODE]]}"
- )
+ device_identification[DID_TYPE] = f"{DEVICE_TYPES[type_code]}"
except KeyError:
_LOGGER.warning(
"%s: ADS device type is unknown: %s",
@@ -493,7 +501,7 @@ def __init__(
self.expected = expected
super().__init__(self.msg())
- def msg(self):
+ def msg(self) -> str:
"""Printable exception method."""
return "{} -> {} ({}) | s1 {} {} | truncated {} {} {} | computed {} {} {} | expected {} {} {}".format(
self.tag,
@@ -511,45 +519,3 @@ def msg(self):
bin(int.from_bytes(self.expected, byteorder="big")),
chr(ord(self.expected)),
)
-
-
-def linky_tic_tester(device: str, std_mode: bool) -> None:
- """Before starting the thread, this method can help validate configuration by opening the serial communication and read a line. It returns None if everything went well or a string describing the error."""
- # Open connection
- try:
- serial_reader = serial.serial_for_url(
- url=device,
- baudrate=MODE_STANDARD_BAUD_RATE if std_mode else MODE_HISTORIC_BAUD_RATE,
- bytesize=BYTESIZE,
- parity=PARITY,
- stopbits=STOPBITS,
- timeout=1,
- )
- except serial.serialutil.SerialException as exc:
- raise CannotConnect(
- f"Unable to connect to the serial device {device}: {exc}"
- ) from exc
- # Try to read a line
- try:
- serial_reader.readline()
- except serial.serialutil.SerialException as exc:
- serial_reader.close()
- raise CannotRead(f"Failed to read a line: {exc}") from exc
- # All good
- serial_reader.close()
-
-
-class CannotConnect(Exception):
- """Error to indicate we cannot connect."""
-
- def __init__(self, message) -> None:
- """Initialize the CannotConnect error with an explanation message."""
- super().__init__(message)
-
-
-class CannotRead(Exception):
- """Error to indicate that the serial connection was open successfully but an error occurred while reading a line."""
-
- def __init__(self, message) -> None:
- """Initialize the CannotRead error with an explanation message."""
- super().__init__(message)
diff --git a/custom_components/linkytic/status_register.py b/custom_components/linkytic/status_register.py
index df8677e..165eb9b 100644
--- a/custom_components/linkytic/status_register.py
+++ b/custom_components/linkytic/status_register.py
@@ -4,7 +4,7 @@
from typing import NamedTuple
-class StatusRegisterEnumValueType(NamedTuple):
+class StatusRegisterField(NamedTuple):
"""Represent a field in the status register.
lsb is the position of the least significant bit of the field in the status register.
len if the length in bit of the field (default 1 bit).
@@ -28,51 +28,44 @@ def get_status(self, register: str) -> str | bool:
return self.options[val] # Let IndexError propagate if val is unknown.
-organe_coupure = {
- 0: "Fermé",
- 1: "Ouvert sur surpuissance",
- 2: "Ouvert sur surtension",
- 3: "Ouvert sur délestage",
- 4: "Ouvert sur ordre CPL ou Euridis",
- 5: "Ouvert sur surchauffe (>Imax)",
- 6: "Ouvert sur surchauffe (Imax)",
+ "overheat_below_imax": "Opened on overheat (Imax)",
+ "overheat_below_imax": "Ouvert sur surchauffe (