Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric timestamps and metric timeout (fixes #64, #67) #69

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 123 additions & 85 deletions ecoflow_exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
from multiprocessing import Process
import requests
import paho.mqtt.client as mqtt
from prometheus_client import start_http_server, REGISTRY, Gauge, Counter
from prometheus_client.core import GaugeMetricFamily, CounterMetricFamily
from prometheus_client import start_http_server, REGISTRY


class RepeatTimer(Timer):
Expand Down Expand Up @@ -189,16 +190,21 @@ def on_message(self, client, userdata, message):


class EcoflowMetric:
def __init__(self, ecoflow_payload_key, device_name):
self.ecoflow_payload_key = ecoflow_payload_key
self.device_name = device_name
self.name = f"ecoflow_{self.convert_ecoflow_key_to_prometheus_name()}"
self.metric = Gauge(self.name, f"value from MQTT object key {ecoflow_payload_key}", labelnames=["device"])

def convert_ecoflow_key_to_prometheus_name(self):
def __init__(self, name, value, labels):
self.original_name = name
self.name = "ecoflow_" + self.transform_key(name)
self.value = value
self.labels = labels
self.timestamp = time.time()
self.label_names = list(labels.keys())

def is_stale(self, current_time, timeout_seconds):
return current_time - self.timestamp > timeout_seconds

def transform_key(self, original_key):
# bms_bmsStatus.maxCellTemp -> bms_bms_status_max_cell_temp
# pd.ext4p8Port -> pd_ext4p8_port
key = self.ecoflow_payload_key.replace('.', '_')
key = original_key.replace('.', '_')
new = key[0].lower()
for character in key[1:]:
if character.isupper() and not new[-1] == '_':
Expand All @@ -207,96 +213,124 @@ def convert_ecoflow_key_to_prometheus_name(self):
# Check that metric name complies with the data model for valid characters
# https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels
if not re.match("[a-zA-Z_:][a-zA-Z0-9_:]*", new):
raise EcoflowMetricException(f"Cannot convert payload key {self.ecoflow_payload_key} to comply with the Prometheus data model. Please, raise an issue!")
raise EcoflowMetricException(f"Cannot convert payload key {original_key} to comply with the Prometheus data model. Please, raise an issue!")
return new

def set(self, value):
self.timestamp = time.time()
self.value = value

def to_metric_family(self):
label_names = list(self.labels.keys())
label_values = list(self.labels.values())
metric_family = GaugeMetricFamily(self.name, f'value from MQTT object key {self.original_name}', labels=label_names)
metric_family.add_metric(label_values, self.value, timestamp=self.timestamp)
return metric_family


class EcoflowCollector:
def __init__(self, device_timeout, metric_timeout):
self.metrics = {}
self.device_timeout = device_timeout
self.metric_timeout = metric_timeout
self.device_last_seen = {}
self.ecoflow_mqtt_messages_receive_total = {}

def generate_ecoflow_online_metric(self):
metric = GaugeMetricFamily('ecoflow_online', '1 if device is online', labels=['device'])
now = time.time()
for device, last_seen in self.device_last_seen.items():
online = 1 if now - last_seen < self.device_timeout else 0
metric.add_metric([device], online)
return metric

def generate_ecoflow_mqtt_messages_receive_total_metric(self):
metric = CounterMetricFamily('ecoflow_mqtt_messages_receive_total', 'total MQTT messages', labels=['device'])
for device, counter in self.ecoflow_mqtt_messages_receive_total.items():
metric.add_metric([device], counter)
return metric

def collect(self):
log.debug(f"Serving metrics")

# Add synthetic metrics
yield self.generate_ecoflow_online_metric()
yield self.generate_ecoflow_mqtt_messages_receive_total_metric()

# Iterate over device metrics
now = time.time()
for metric_name, metric_data in list(self.metrics.items()):
if metric_data.is_stale(now, self.metric_timeout):
log.info(f"Expiring metric {metric_name}")
del self.metrics[metric_name]
continue

yield metric_data.to_metric_family()

def set(self, name, value, device):
# According to best practices for naming metrics and labels, the voltage should be in volts and the current in amperes
# WARNING! This will ruin all Prometheus historical data and backward compatibility of Grafana dashboard
# value = value / 1000 if value.endswith("_vol") or value.endswith("_amp") else value
log.debug(f"Set {self.name} = {value}")
self.metric.labels(device=self.device_name).set(value)
# value = value / 1000 if key.endswith("Vol") or key.endswith("Amp") else value

log.debug(f"Set {name} = {value}")

key = (name, device)
if key not in self.metrics:
log.info(f"Created new metric {name}")

self.metrics[key] = EcoflowMetric(name, value, {"device": device})

if name == 'inv.acInVol' and value == 0:
ac_in_current_key = ('inv.acInAmp', device)
ac_in_current = self.metrics[ac_in_current_key]
if ac_in_current:
log.debug("Set AC inverter input current to zero because of zero inverter voltage")
ac_in_current.set(0)

def clear(self):
log.debug(f"Clear {self.name}")
self.metric.clear()
def update_device_last_seen(self, device):
self.device_last_seen[device] = time.time()

def increment_mqtt_messages_receive_total(self, device):
if device not in self.ecoflow_mqtt_messages_receive_total:
self.ecoflow_mqtt_messages_receive_total[device] = 0
self.ecoflow_mqtt_messages_receive_total[device] += 1


class Worker:
def __init__(self, message_queue, device_name, collecting_interval_seconds=10):
def __init__(self, message_queue, device_name, collector):
self.message_queue = message_queue
self.device_name = device_name
self.collecting_interval_seconds = collecting_interval_seconds
self.metrics_collector = []
self.online = Gauge("ecoflow_online", "1 if device is online", labelnames=["device"])
self.mqtt_messages_receive_total = Counter("ecoflow_mqtt_messages_receive_total", "total MQTT messages", labelnames=["device"])
self.collector = collector

def loop(self):
time.sleep(self.collecting_interval_seconds)
while True:
queue_size = self.message_queue.qsize()
if queue_size > 0:
log.info(f"Processing {queue_size} event(s) from the message queue")
self.online.labels(device=self.device_name).set(1)
self.mqtt_messages_receive_total.labels(device=self.device_name).inc(queue_size)
else:
log.info("Message queue is empty. Assuming that the device is offline")
self.online.labels(device=self.device_name).set(0)
# Clear metrics for NaN (No data) instead of last value
for metric in self.metrics_collector:
metric.clear()

while not self.message_queue.empty():
payload = self.message_queue.get()
log.debug(f"Recived payload: {payload}")
if payload is None:
continue

try:
payload = json.loads(payload)
params = payload['params']
except KeyError as key:
log.error(f"Failed to extract key {key} from payload: {payload}")
except Exception as error:
log.error(f"Failed to parse MQTT payload: {payload} Error: {error}")
continue
self.process_payload(params)

time.sleep(self.collecting_interval_seconds)

def get_metric_by_ecoflow_payload_key(self, ecoflow_payload_key):
for metric in self.metrics_collector:
if metric.ecoflow_payload_key == ecoflow_payload_key:
log.debug(f"Found metric {metric.name} linked to {ecoflow_payload_key}")
return metric
log.debug(f"Cannot find metric linked to {ecoflow_payload_key}")
return False
payload = self.message_queue.get()
log.debug(f"Received payload: {payload}")
self.collector.increment_mqtt_messages_receive_total(self.device_name)
if payload is None:
continue

def process_payload(self, params):
log.debug(f"Processing params: {params}")
for ecoflow_payload_key in params.keys():
ecoflow_payload_value = params[ecoflow_payload_key]
if not isinstance(ecoflow_payload_value, (int, float)):
log.warning(f"Skipping unsupported metric {ecoflow_payload_key}: {ecoflow_payload_value}")
try:
payload = json.loads(payload)
params = payload['params']
except KeyError as key:
log.error(f"Failed to extract key {key} from payload: {payload}")
except Exception as error:
log.error(f"Failed to parse MQTT payload: {payload} Error: {error}")
continue

metric = self.get_metric_by_ecoflow_payload_key(ecoflow_payload_key)
if not metric:
try:
metric = EcoflowMetric(ecoflow_payload_key, self.device_name)
except EcoflowMetricException as error:
log.error(error)
continue
log.info(f"Created new metric from payload key {metric.ecoflow_payload_key} -> {metric.name}")
self.metrics_collector.append(metric)
self.process_payload(params)

metric.set(ecoflow_payload_value)
def process_payload(self, params):
log.debug(f"Processing params: {params}")
for key, value in params.items():
self.collector.update_device_last_seen(self.device_name)

if not isinstance(value, (int, float)):
log.warning(f"Skipping unsupported metric {key}: {value}")
continue

if ecoflow_payload_key == 'inv.acInVol' and ecoflow_payload_value == 0:
ac_in_current = self.get_metric_by_ecoflow_payload_key('inv.acInAmp')
if ac_in_current:
log.debug("Set AC inverter input current to zero because of zero inverter voltage")
ac_in_current.set(0)
self.collector.set(key, value, self.device_name)


def signal_handler(signum, frame):
Expand Down Expand Up @@ -334,8 +368,9 @@ def main():
ecoflow_password = os.getenv("ECOFLOW_PASSWORD")
ecoflow_api_host = os.getenv("ECOFLOW_API_HOST", "api.ecoflow.com")
exporter_port = int(os.getenv("EXPORTER_PORT", "9090"))
collecting_interval_seconds = int(os.getenv("COLLECTING_INTERVAL", "10"))
timeout_seconds = int(os.getenv("MQTT_TIMEOUT", "60"))
device_timeout = int(os.getenv("DEVICE_TIMEOUT", "30"))
metric_timeout = int(os.getenv("METRIC_TIMEOUT", "60"))
mqtt_timeout = int(os.getenv("MQTT_TIMEOUT", "60"))

if (not device_sn or not ecoflow_username or not ecoflow_password):
log.error("Please, provide all required environment variables: DEVICE_SN, ECOFLOW_USERNAME, ECOFLOW_PASSWORD")
Expand All @@ -349,14 +384,17 @@ def main():

message_queue = Queue()

EcoflowMQTT(message_queue, device_sn, auth.mqtt_username, auth.mqtt_password, auth.mqtt_url, auth.mqtt_port, auth.mqtt_client_id, timeout_seconds)
EcoflowMQTT(message_queue, device_sn, auth.mqtt_username, auth.mqtt_password, auth.mqtt_url, auth.mqtt_port, auth.mqtt_client_id, mqtt_timeout)

collector = EcoflowCollector(device_timeout, metric_timeout)
REGISTRY.register(collector)

metrics = Worker(message_queue, device_name, collecting_interval_seconds)
worker = Worker(message_queue, device_name, collector)

start_http_server(exporter_port)

try:
metrics.loop()
worker.loop()

except KeyboardInterrupt:
log.info("Received KeyboardInterrupt. Exiting...")
Expand Down