diff --git a/powerapi/cli/common_cli_parsing_manager.py b/powerapi/cli/common_cli_parsing_manager.py index 8b95eb77..92207239 100644 --- a/powerapi/cli/common_cli_parsing_manager.py +++ b/powerapi/cli/common_cli_parsing_manager.py @@ -32,6 +32,8 @@ from powerapi.cli.parsing_manager import RootConfigParsingManager, SubgroupConfigParsingManager from powerapi.cli.config_parser import store_true from powerapi.cli.config_parser import MissingValueException +from powerapi.database.prometheus_db import DEFAULT_METRIC_DESCRIPTION, DEFAULT_MODEL_VALUE, DEFAULT_PUSHER_NAME, \ + DEFAULT_ADDRESS from powerapi.exception import BadTypeException, BadContextException, UnknownArgException POWERAPI_ENVIRONMENT_VARIABLE_PREFIX = 'POWERAPI_' @@ -40,6 +42,8 @@ POWERAPI_PRE_PROCESSOR_ENVIRONMENT_VARIABLE_PREFIX = POWERAPI_ENVIRONMENT_VARIABLE_PREFIX + 'PRE_PROCESSOR_' POWERAPI_POST_PROCESSOR_ENVIRONMENT_VARIABLE_PREFIX = POWERAPI_ENVIRONMENT_VARIABLE_PREFIX + 'POST_PROCESSOR' +TAGS_ARGUMENT_HELP_TEXT = 'specify report tags' + def extract_file_names(arg, val, args, acc): """ @@ -250,69 +254,35 @@ def __init__(self): subgroup_parser=subparser_mongo_output ) - subparser_prom_output = SubgroupConfigParsingManager("prom") - subparser_prom_output.add_argument("t", "tags", help_text="specify report tags") - subparser_prom_output.add_argument("u", "uri", help_text="specify server uri") - subparser_prom_output.add_argument( + subparser_prometheus_output = SubgroupConfigParsingManager("prometheus") + subparser_prometheus_output.add_argument("t", "tags", help_text=TAGS_ARGUMENT_HELP_TEXT) + subparser_prometheus_output.add_argument("u", "uri", help_text="specify server uri", + default_value=DEFAULT_ADDRESS) + subparser_prometheus_output.add_argument( "p", "port", help_text="specify server port", argument_type=int ) - subparser_prom_output.add_argument( + subparser_prometheus_output.add_argument( "M", "metric_name", help_text="specify metric name" ) - subparser_prom_output.add_argument( + subparser_prometheus_output.add_argument( "d", "metric_description", help_text="specify metric description", - default_value="energy consumption", - ) - help_text = "specify number of second for the value must be aggregated before compute statistics on them" - subparser_prom_output.add_argument( - "A", "aggregation_period", help_text=help_text, default_value=15, argument_type=int - ) - - subparser_prom_output.add_argument( - "m", - "model", - help_text="specify data type that will be stored in the database", - default_value="PowerReport", - ) - subparser_prom_output.add_argument( - "n", "name", help_text="specify pusher name", default_value="pusher_prom" - ) - self.add_subgroup_parser( - subgroup_name="output", - subgroup_parser=subparser_prom_output + default_value=DEFAULT_METRIC_DESCRIPTION ) - subparser_direct_prom_output = SubgroupConfigParsingManager("direct_prom") - subparser_direct_prom_output.add_argument( - "t", "tags", help_text="specify report tags" - ) - subparser_direct_prom_output.add_argument("a", "uri", help_text="specify server uri") - subparser_direct_prom_output.add_argument( - "p", "port", help_text="specify server port", argument_type=int - ) - subparser_direct_prom_output.add_argument( - "M", "metric_name", help_text="specify metric name" - ) - subparser_direct_prom_output.add_argument( - "d", - "metric_description", - help_text="specify metric description", - default_value="energy consumption", - ) - subparser_direct_prom_output.add_argument( + subparser_prometheus_output.add_argument( "m", "model", help_text="specify data type that will be stored in the database", - default_value="PowerReport", + default_value=DEFAULT_MODEL_VALUE, ) - subparser_direct_prom_output.add_argument( - "n", "name", help_text="specify pusher name", default_value="pusher_prom" + subparser_prometheus_output.add_argument( + "n", "name", help_text="specify pusher name", default_value=DEFAULT_PUSHER_NAME ) self.add_subgroup_parser( subgroup_name="output", - subgroup_parser=subparser_direct_prom_output + subgroup_parser=subparser_prometheus_output ) subparser_csv_output = SubgroupConfigParsingManager("csv") @@ -328,7 +298,7 @@ def __init__(self): default_value="PowerReport", ) - subparser_csv_output.add_argument("t", "tags", help_text="specify report tags") + subparser_csv_output.add_argument("t", "tags", help_text=TAGS_ARGUMENT_HELP_TEXT) subparser_csv_output.add_argument( "n", "name", help_text="specify pusher name", default_value="pusher_csv" ) @@ -339,7 +309,7 @@ def __init__(self): subparser_influx_output = SubgroupConfigParsingManager("influxdb") subparser_influx_output.add_argument("u", "uri", help_text="specify InfluxDB uri") - subparser_influx_output.add_argument("t", "tags", help_text="specify report tags") + subparser_influx_output.add_argument("t", "tags", help_text=TAGS_ARGUMENT_HELP_TEXT) subparser_influx_output.add_argument( "d", "db", help_text="specify InfluxDB database name" ) @@ -385,7 +355,7 @@ def __init__(self): subparser_influx2_output = SubgroupConfigParsingManager("influxdb2") subparser_influx2_output.add_argument("u", "uri", help_text="specify InfluxDB uri") - subparser_influx2_output.add_argument("t", "tags", help_text="specify report tags") + subparser_influx2_output.add_argument("t", "tags", help_text=TAGS_ARGUMENT_HELP_TEXT) subparser_influx2_output.add_argument("k", "token", help_text="specify token for accessing the database") subparser_influx2_output.add_argument("g", "org", diff --git a/powerapi/cli/config_parser.py b/powerapi/cli/config_parser.py index e9375684..c4ea6670 100644 --- a/powerapi/cli/config_parser.py +++ b/powerapi/cli/config_parser.py @@ -259,7 +259,7 @@ def validate(self, conf: dict) -> dict: elif current_argument_name != 'type': raise UnknownArgException(argument_name=current_argument_name) - return conf + return self.normalize_configuration(conf=conf) def normalize_configuration(self, conf: dict) -> dict: """ diff --git a/powerapi/cli/generator.py b/powerapi/cli/generator.py index 020f9f6b..ce328083 100644 --- a/powerapi/cli/generator.py +++ b/powerapi/cli/generator.py @@ -42,7 +42,7 @@ TIMEOUT_QUERY_DEFAULT_VALUE from powerapi.processor.pre.libvirt.libvirt_pre_processor_actor import LibvirtPreProcessorActor from powerapi.report import HWPCReport, PowerReport, ControlReport, ProcfsReport, Report, FormulaReport -from powerapi.database import MongoDB, CsvDB, InfluxDB, OpenTSDB, SocketDB, PrometheusDB, DirectPrometheusDB, \ +from powerapi.database import MongoDB, CsvDB, InfluxDB, OpenTSDB, SocketDB, PrometheusDB, \ VirtioFSDB, FileDB from powerapi.puller import PullerActor from powerapi.pusher import PusherActor @@ -181,7 +181,7 @@ def __init__(self, component_group_name: str): self.db_factory = { 'mongodb': lambda db_config: MongoDB(report_type=db_config['model'], uri=db_config['uri'], db_name=db_config['db'], collection_name=db_config['collection']), - 'socket': lambda db_config: SocketDB(db_config['model'], db_config['port']), + 'socket': lambda db_config: SocketDB(report_type=db_config['model'], port=db_config['port']), 'csv': lambda db_config: CsvDB(report_type=db_config['model'], tags=gen_tag_list(db_config), current_path=os.getcwd() if 'directory' not in db_config else db_config[ 'directory'], @@ -196,16 +196,12 @@ def __init__(self, component_group_name: str): port=None if 'port' not in db_config else db_config['port']), 'opentsdb': lambda db_config: OpenTSDB(report_type=db_config['model'], host=db_config['uri'], port=db_config['port'], metric_name=db_config['metric_name']), - 'prom': lambda db_config: PrometheusDB(report_type=db_config['model'], port=db_config['port'], - address=db_config['uri'], metric_name=db_config['metric_name'], - metric_description=db_config['metric_description'], - aggregation_periode=db_config['aggregation_period'], - tags=gen_tag_list(db_config)), - 'direct_prom': lambda db_config: DirectPrometheusDB(report_type=db_config['model'], port=db_config['port'], - address=db_config['uri'], - metric_name=db_config['metric_name'], - metric_description=db_config['metric_description'], - tags=gen_tag_list(db_config)), + 'prometheus': lambda db_config: PrometheusDB(report_type=db_config['model'], + port=db_config['port'], + address=db_config['uri'], + metric_name=db_config['metric_name'], + metric_description=db_config['metric_description'], + tags=gen_tag_list(db_config)), 'virtiofs': lambda db_config: VirtioFSDB(report_type=db_config['model'], vm_name_regexp=db_config['vm_name_regexp'], root_directory_name=db_config['root_directory_name'], diff --git a/powerapi/database/__init__.py b/powerapi/database/__init__.py index 0d86c6a5..3fe2aefd 100644 --- a/powerapi/database/__init__.py +++ b/powerapi/database/__init__.py @@ -35,6 +35,5 @@ from powerapi.database.influxdb2 import InfluxDB2 from powerapi.database.prometheus_db import PrometheusDB from powerapi.database.virtiofs_db import VirtioFSDB -from powerapi.database.direct_prometheus_db import DirectPrometheusDB from powerapi.database.socket_db import SocketDB from powerapi.database.file_db import FileDB diff --git a/powerapi/database/direct_prometheus_db.py b/powerapi/database/direct_prometheus_db.py deleted file mode 100644 index 4dcab41b..00000000 --- a/powerapi/database/direct_prometheus_db.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright (c) 2021, INRIA -# Copyright (c) 2021, University of Lille -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# * Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import logging -from typing import List, Type - -try: - from prometheus_client import Gauge -except ImportError: - logging.getLogger().info("prometheus-client is not installed.") - -from powerapi.report import Report -from .prometheus_db import BasePrometheusDB - - -class DirectPrometheusDB(BasePrometheusDB): - """ - Database that expose received data as metric in order to be scrapped by a prometheus instance - Could only be used with a pusher actor - """ - def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, metric_description: str, tags: List[str]): - """ - :param address: address that expose the metric - :param port: - :param metric_name: - :param metric_description: short sentence that describe the metric - :param tags: metadata used to tag metric - """ - BasePrometheusDB.__init__(self, report_type, port, address, metric_name, metric_description, tags) - - self.energy_metric = None - - self.current_ts = 0 - self.exposed_measure = {} - self.measure_for_current_period = {} - - def __iter__(self): - raise NotImplementedError() - - def _init_metrics(self): - self.energy_metric = Gauge(self.metric_name, self.metric_description, ['sensor', 'target'] + self.tags) - - def _expose_data(self, _, measure): - kwargs = {label: measure['tags'][label] for label in measure['tags']} - try: - self.energy_metric.labels(**kwargs).set(measure['value']) - except TypeError: - self.energy_metric.labels(kwargs).set(measure['value']) - - def _report_to_measure_and_key(self, report): - value = self.report_type.to_prometheus(report, self.tags) - key = ''.join([str(value['tags'][tag]) for tag in value['tags']]) - return key, value - - def _update_exposed_measure(self): - for key in self.exposed_measure: - if key not in self.measure_for_current_period: - args = self.exposed_measure[key] - self.energy_metric.remove(*args) - self.exposed_measure = self.measure_for_current_period - self.measure_for_current_period = {} - - def save(self, report: Report): - """ - Override from BaseDB - - :param report: Report to save - """ - key, measure = self._report_to_measure_and_key(report) - if self.current_ts != measure['time']: - self.current_ts = measure['time'] - self._update_exposed_measure() - - self._expose_data(key, measure) - if key not in self.measure_for_current_period: - args = [measure['tags'][label] for label in measure['tags']] - self.measure_for_current_period[key] = args - - def save_many(self, reports: List[Report]): - """ - Save a batch of data - - :param reports: Batch of data. - """ - for report in reports: - self.save(report) diff --git a/powerapi/database/prometheus_db.py b/powerapi/database/prometheus_db.py index a5ab5763..f0104e17 100644 --- a/powerapi/database/prometheus_db.py +++ b/powerapi/database/prometheus_db.py @@ -1,5 +1,5 @@ -# Copyright (c) 2021, INRIA -# Copyright (c) 2021, University of Lille +# Copyright (c) 2023, INRIA +# Copyright (c) 2023, University of Lille # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -29,22 +29,34 @@ import logging from typing import List, Type + try: from prometheus_client import start_http_server, Gauge except ImportError: logging.getLogger().info("prometheus-client is not installed.") from powerapi.report import Report -from powerapi.utils import StatBuffer from .base_db import BaseDB +DEFAULT_ADDRESS = '127.0.0.1' +DEFAULT_METRIC_DESCRIPTION = 'energy consumption' +DEFAULT_MODEL_VALUE = 'PowerReport' +DEFAULT_PUSHER_NAME = 'pusher_prometheus' +TAGS_KEY = 'tags' +VALUE_KEY = 'value' +TIME_KEY = 'time' + +SENSOR_TAG = 'sensor' +TARGET_TAG = 'target' + class BasePrometheusDB(BaseDB): """ Base class to expose data to prometheus instance """ - def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, - metric_description: str, tags: List[str]): + + def __init__(self, report_type: Type[Report], port: int, metric_name: str, + tags: List[str], metric_description: str = DEFAULT_METRIC_DESCRIPTION, address: str = DEFAULT_ADDRESS): BaseDB.__init__(self, report_type) self.address = address self.port = port @@ -57,96 +69,67 @@ def _init_metrics(self): def connect(self): """ - Start a HTTP server exposing metrics + Start an HTTP server exposing metrics """ - self._init_metrics() - start_http_server(self.port) + start_http_server(port=self.port, addr=self.address) class PrometheusDB(BasePrometheusDB): """ - Database that expose received data as metric in order to be scrapped by a prometheus instance - Could only be used with a pusher actor + Database that expose received raw power estimations as metrics in order to be scrapped by a prometheus instance + It can only be used with a pusher actor """ - def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, - metric_description: str, aggregation_periode: int, tags: List[str]): + def __init__(self, report_type: Type[Report], port: int, address: str, metric_name: str, metric_description: str, + tags: List[str]): """ - :param address: address that expose the metric - :param port: - :param metric_name: + :param address: address that exposes the metric + :param port: port used to expose the metric + :param metric_name: the name of the metric :param metric_description: short sentence that describe the metric - :param aggregation_periode: number of second for the value must be aggregated before compute statistics on them :param tags: metadata used to tag metric """ - BasePrometheusDB.__init__(self, report_type, port, address, metric_name, metric_description, tags) - self.aggregation_periode = aggregation_periode - self.final_tags = ['sensor', 'target'] + tags + BasePrometheusDB.__init__(self, + report_type=report_type, + port=port, + address=address, + metric_name=metric_name, + metric_description=metric_description, + tags=tags) - self.mean_metric = None - self.std_metric = None - self.min_metric = None - self.max_metric = None + self.energy_metric = None + self.current_ts = 0 self.exposed_measure = {} self.measure_for_current_period = {} - self.current_period_end = 0 - - self.buffer = StatBuffer(aggregation_periode) + self.metrics_initialized = False def __iter__(self): raise NotImplementedError() def _init_metrics(self): - self.mean_metric = Gauge(self.metric_name + '_mean', self.metric_description + '(MEAN)', self.final_tags) - self.std_metric = Gauge(self.metric_name + '_std', self.metric_description + '(STD)', self.final_tags) - self.min_metric = Gauge(self.metric_name + '_min', self.metric_description + '(MIN)', self.final_tags) - self.max_metric = Gauge(self.metric_name + '_max', self.metric_description + '(MAX)', self.final_tags) + if not self.metrics_initialized: + self.energy_metric = Gauge(self.metric_name, self.metric_description, [SENSOR_TAG, TARGET_TAG] + self.tags) + self.metrics_initialized = True - def _expose_data(self, key): - aggregated_value = self.buffer.get_stats(key) - if aggregated_value is None: - return - - kwargs = {label: aggregated_value['tags'][label] for label in self.final_tags} + def _expose_data(self, _, measure): + kwargs = {label: measure[TAGS_KEY][label] for label in measure[TAGS_KEY]} try: - self.mean_metric.labels(**kwargs).set(aggregated_value['mean']) - self.std_metric.labels(**kwargs).set(aggregated_value['std']) - self.min_metric.labels(**kwargs).set(aggregated_value['min']) - self.max_metric.labels(**kwargs).set(aggregated_value['max']) + self.energy_metric.labels(**kwargs).set(measure[VALUE_KEY]) except TypeError: - self.mean_metric.labels(kwargs).set(aggregated_value['mean']) - self.std_metric.labels(kwargs).set(aggregated_value['std']) - self.min_metric.labels(kwargs).set(aggregated_value['min']) - self.max_metric.labels(kwargs).set(aggregated_value['max']) + self.energy_metric.labels(kwargs).set(measure[VALUE_KEY]) def _report_to_measure_and_key(self, report): value = self.report_type.to_prometheus(report, self.tags) - key = ''.join([str(value['tags'][tag]) for tag in self.final_tags]) + key = ''.join([str(value[TAGS_KEY][tag]) for tag in value[TAGS_KEY]]) return key, value def _update_exposed_measure(self): - updated_exposed_measure = {} - for key in self.exposed_measure: if key not in self.measure_for_current_period: args = self.exposed_measure[key] - self.mean_metric.remove(*args) - self.std_metric.remove(*args) - self.min_metric.remove(*args) - self.max_metric.remove(*args) - else: - updated_exposed_measure[key] = self.exposed_measure[key] - self.exposed_measure = updated_exposed_measure - - def _append_measure_from_old_period_to_buffer_and_expose_data(self): - for old_key, old_measure_list in self.measure_for_current_period.items(): - for old_measure in old_measure_list: - self.buffer.append(old_measure, old_key) - self._expose_data(old_key) - - def _reinit_persiod(self, new_measure_time): - self.current_period_end = new_measure_time + self.aggregation_periode + self.energy_metric.remove(*args) + self.exposed_measure = self.measure_for_current_period self.measure_for_current_period = {} def save(self, report: Report): @@ -155,20 +138,19 @@ def save(self, report: Report): :param report: Report to save """ + if self.tags is None or not self.tags: + self.tags = list(report.metadata.keys()) + self._init_metrics() + key, measure = self._report_to_measure_and_key(report) - if measure['time'] > self.current_period_end: - self._append_measure_from_old_period_to_buffer_and_expose_data() + if self.current_ts != measure[TIME_KEY]: + self.current_ts = measure[TIME_KEY] self._update_exposed_measure() - self._reinit_persiod(measure['time']) - - if key not in self.exposed_measure: - args = [measure['tags'][label] for label in self.final_tags] - self.exposed_measure[key] = args + self._expose_data(key, measure) if key not in self.measure_for_current_period: - self.measure_for_current_period[key] = [] - - self.measure_for_current_period[key].append(measure) + args = [measure[TAGS_KEY][label] for label in measure[TAGS_KEY]] + self.measure_for_current_period[key] = args def save_many(self, reports: List[Report]): """ diff --git a/powerapi/processor/pre/k8s/k8s_pre_processor_handlers.py b/powerapi/processor/pre/k8s/k8s_pre_processor_handlers.py index 16aca2a2..2b01a10f 100644 --- a/powerapi/processor/pre/k8s/k8s_pre_processor_handlers.py +++ b/powerapi/processor/pre/k8s/k8s_pre_processor_handlers.py @@ -68,9 +68,9 @@ def handle(self, message: Message): self.state.actor.logger.warning( f"Container with no associated pod : {message.target}, {c_id}, {namespace}, {pod}" ) + namespace = "" + pod = "" else: - message.metadata[POD_NAMESPACE_METADATA_KEY] = namespace - message.metadata[POD_NAME_METADATA_KEY] = pod self.state.actor.logger.debug( f"K8sPreProcessorActorHWPCReportHandler add metadata to report {c_id}, {namespace}, {pod}" ) @@ -79,6 +79,9 @@ def handle(self, message: Message): for label_key, label_value in labels.items(): message.metadata[f"label_{label_key}"] = label_value + message.metadata[POD_NAMESPACE_METADATA_KEY] = namespace + message.metadata[POD_NAME_METADATA_KEY] = pod + self._send_report(report=message) diff --git a/powerapi/pusher/handlers.py b/powerapi/pusher/handlers.py index 4718dda8..104a402c 100644 --- a/powerapi/pusher/handlers.py +++ b/powerapi/pusher/handlers.py @@ -32,6 +32,7 @@ from powerapi.handler import InitHandler, StartHandler, PoisonPillMessageHandler from powerapi.message import ErrorMessage from powerapi.database import DBError +from powerapi.report import BadInputData class PusherStartHandler(StartHandler): @@ -89,6 +90,9 @@ def handle(self, msg): self.state.buffer.sort(key=lambda x: x.timestamp) - self.state.database.save_many(self.state.buffer) - self.state.actor.logger.debug('save ' + str(len(self.state.buffer)) + ' reports in database') - self.state.buffer = [] + try: + self.state.database.save_many(self.state.buffer) + self.state.actor.logger.debug('save ' + str(len(self.state.buffer)) + ' reports in database') + self.state.buffer = [] + except BadInputData as ex: + self.state.actor.logger.warning(f"The report cannot be saved: {ex.msg}") diff --git a/tests/integration/database/test_direct_prometheus_db.py b/tests/integration/database/test_direct_prometheus_db.py deleted file mode 100644 index 507d350f..00000000 --- a/tests/integration/database/test_direct_prometheus_db.py +++ /dev/null @@ -1,181 +0,0 @@ -# Copyright (c) 2021, INRIA -# Copyright (c) 2021, University of Lille -# All rights reserved. - -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: - -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. - -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. - -# * Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. - -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -import re -import multiprocessing -import datetime -import time - -import requests -import pytest - -from powerapi.database import DirectPrometheusDB -from powerapi.report import PowerReport - -ADDR = 'localhost' -METRIC = 'test' -DESC = 'TEST' - -TARGET1 = 'targetA' -TARGET2 = 'targetB' - -REPORTA_1 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 0), 0, TARGET1, 10, {'socket': 0}) -REPORTA_2 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 1), 0, TARGET1, 20, {'socket': 0}) -REPORTA_3 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 2), 0, TARGET1, 30, {'socket': 0}) -REPORTA_4 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 3), 0, TARGET1, 40, {'socket': 0}) -REPORTA_5 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 4), 0, TARGET1, 50, {'socket': 0}) -REPORTA_6 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 5), 0, TARGET1, 60, {'socket': 0}) -REPORTA_7 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 6), 0, TARGET1, 70, {'socket': 0}) - -REPORTB_1 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 0), 0, TARGET2, 40, {'socket': 0}) -REPORTB_2 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 1), 0, TARGET2, 60, {'socket': 0}) -REPORTB_3 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 2), 0, TARGET2, 70, {'socket': 0}) -REPORTB_4 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 3), 0, TARGET2, 80, {'socket': 0}) -REPORTB_5 = PowerReport(datetime.datetime(1970, 1, 1, 1, 1, 4), 0, TARGET2, 90, {'socket': 0}) - - -def extract_metrics(metric_prefix, url): - time.sleep(0.5) - request_result = requests.get(url) - regexp = re.compile(metric_prefix + '{sensor="(.*)",socket="(.*)",target="(.*)"} (.*)') - - metrics = {} - - for s in filter(lambda s: metric_prefix in s and s[0] != '#', request_result.text.split('\n')): - [(sensor, socket, target, value)] = re.findall(regexp, s) - if target not in metrics: - metrics[target] = [] - metrics[target].append({'socket': socket, 'sensor': sensor, 'value': float(value)}) - return metrics - - -class DirectPrometheusServer(multiprocessing.Process): - def __init__(self, q, port): - multiprocessing.Process.__init__(self) - self.port = port - self.q = q - - def run(self): - db = DirectPrometheusDB(PowerReport, self.port, ADDR, METRIC, DESC, ['socket']) - db.connect() - self.q.put('ok') - while True: - report_list = self.q.get() - db.save_many(report_list) - - -def _gen_serv(unused_tcp_port_factory): - port = unused_tcp_port_factory() - q = multiprocessing.Queue() - p = DirectPrometheusServer(q, port) - p.start() - return port, q, p - - -@pytest.fixture -def db_info(unused_tcp_port_factory): - """ - start a DirectPrometheusDB in a process and return a q to send report to the DB - """ - port, q, p = _gen_serv(unused_tcp_port_factory) - if q.get(timeout=1) == 'ok': - yield q, _gen_url(port) - else: - p.terminate() - port, q, p = _gen_serv(unused_tcp_port_factory) - yield q, _gen_url(port) - p.terminate() - - -def _gen_url(port): - return 'http://' + ADDR + ':' + str(port) + '/metrics' - - -def test_create_direct_prometheus_db_and_connect_it_must_launch_web_server_on_given_address(db_info): - db, url = db_info - r = requests.get(url) - assert r.status_code == 200 - - -def test_create_direct_prometheus_db_and_dont_connect_it_must_not_launch_web_server_on_given_address(unused_tcp_port): - _ = DirectPrometheusDB(PowerReport, unused_tcp_port, ADDR, METRIC, DESC, ['socket']) - with pytest.raises(requests.exceptions.ConnectionError): - _ = requests.get(_gen_url(unused_tcp_port)) - - -def test_save_one_report_must_expose_data(db_info): - db, url = db_info - db.put([REPORTA_1]) - assert extract_metrics(METRIC, url) != {} - - -def test_save_one_report_must_expose_energy_metric_for_the_given_target(db_info): - db, url = db_info - db.put([REPORTA_1]) - data = extract_metrics(METRIC, url) - assert TARGET1 in data - - -def test_save_one_report_must_expose_correct_value(db_info): - db, url = db_info - db.put([REPORTA_1]) - data = extract_metrics(METRIC, url) - assert data[TARGET1][0]['value'] == 10 - - -def test_save_two_reports_with_same_target_must_expose_correct_energy_value_for_second_report(db_info): - db, url = db_info - db.put([REPORTA_1, REPORTA_2]) - data = extract_metrics(METRIC, url) - assert data[TARGET1][0]['value'] == 20 - - -def test_save_two_report_with_different_target_must_expose_data_for_the_two_target(db_info): - db, url = db_info - db.put([REPORTA_1, REPORTB_1]) - data = extract_metrics(METRIC, url) - assert TARGET1 in data - assert TARGET2 in data - - -def test_save_two_report_with_different_target_must_expose_correct_data_for_each_target(db_info): - db, url = db_info - db.put([REPORTA_1, REPORTB_1]) - - data = extract_metrics(METRIC, url) - assert data[TARGET1][0]['value'] == 10 - assert data[TARGET2][0]['value'] == 40 - - -def test_save_report_from_two_target_and_then_report_from_one_target_must_finaly_only_expose_report_from_remaining_target( - db_info): - db, url = db_info - db.put([REPORTA_1, REPORTB_1, REPORTA_2, REPORTA_3]) - data = extract_metrics(METRIC, url) - assert TARGET1 in data - assert TARGET2 not in data diff --git a/tests/integration/database/test_prometheus_db.py b/tests/integration/database/test_prometheus_db.py index 1a22ac73..172a5042 100644 --- a/tests/integration/database/test_prometheus_db.py +++ b/tests/integration/database/test_prometheus_db.py @@ -65,118 +65,143 @@ def extract_metrics(metric_prefix, url): time.sleep(0.5) request_result = requests.get(url) - - regexp = re.compile(metric_prefix + '_(.*){sensor="(.*)",socket="(.*)",target="(.*)"} (.*)') + regexp = re.compile(metric_prefix + '{sensor="(.*)",socket="(.*)",target="(.*)"} (.*)') metrics = {} for s in filter(lambda s: metric_prefix in s and s[0] != '#', request_result.text.split('\n')): - [(metric, sensor, socket, target, value)] = re.findall(regexp, s) - if metric not in metrics: - metrics[metric] = {} - if target not in metrics[metric]: - metrics[metric][target] = [] - metrics[metric][target].append({'socket': socket, 'sensor': sensor, 'value': float(value)}) + [(sensor, socket, target, value)] = re.findall(regexp, s) + if target not in metrics: + metrics[target] = [] + metrics[target].append({'socket': socket, 'sensor': sensor, 'value': float(value)}) return metrics -class Prometheus_server(multiprocessing.Process): - def __init__(self, q): +class PrometheusServer(multiprocessing.Process): + def __init__(self, q, port): multiprocessing.Process.__init__(self) + self.port = port self.q = q def run(self): - db = PrometheusDB(PowerReport, PORT, ADDR, METRIC, DESC, AGG, ['socket']) + db = PrometheusDB(report_type=PowerReport, + port=self.port, + address=ADDR, + metric_name=METRIC, + metric_description=DESC, + tags=['socket']) db.connect() + self.q.put('ok') while True: report_list = self.q.get() db.save_many(report_list) +def _gen_serv(): + port = PORT + q = multiprocessing.Queue() + p = PrometheusServer(q, port) + p.start() + return port, q, p + + @pytest.fixture -def db(): +def db_info(): """ start a PrometheusDB in a process and return a q to send report to the DB """ - q = multiprocessing.Queue() - p = Prometheus_server(q) - p.start() - yield q + port, q, p = _gen_serv() + if q.get(timeout=1) == 'ok': + yield q, _gen_url(port) + else: + p.terminate() + port, q, p = _gen_serv() + yield q, _gen_url(port) p.terminate() -def test_create_prometheus_db_and_connect_it_must_launch_web_server_on_given_address(db): - r = requests.get(URL) +def _gen_url(port): + return 'http://' + ADDR + ':' + str(port) + '/metrics' + + +def test_create_direct_prometheus_db_and_connect_it_must_launch_web_server_on_given_address(db_info): + _, url = db_info + r = requests.get(url) assert r.status_code == 200 -def test_create_prometheus_db_and_dont_connect_it_must_not_launch_web_server_on_given_address(): - _ = PrometheusDB(PowerReport, PORT, ADDR, METRIC, DESC, AGG, ['socket']) +def test_create_direct_prometheus_db_and_dont_connect_it_must_not_launch_web_server_on_given_address(): + _ = PrometheusDB(report_type=PowerReport, + port=PORT, + address=ADDR, + metric_name=METRIC, + metric_description=DESC, + tags=['socket']) with pytest.raises(requests.exceptions.ConnectionError): - _ = requests.get(URL) + _ = requests.get(_gen_url(PORT)) -def test_save_one_report_must_not_expose_data(db): +def test_save_one_report_must_expose_data(db_info): + db, url = db_info db.put([REPORTA_1]) - assert extract_metrics(METRIC, URL) == {} - + assert extract_metrics(METRIC, url) != {} -def test_save_two_report_with_same_target_must_not_expose_data(db): - db.put([REPORTA_1, REPORTA_2]) - assert extract_metrics(METRIC, URL) == {} +def test_save_one_report_with_prometheus_specified_labels(db_info): + db, url = db_info + db.put([REPORTA_1]) + data = extract_metrics(METRIC, url) -def test_save_three_report_with_same_target_must_expose_data_for_the_two_first_reports(db): - db.put([REPORTA_1, REPORTA_2, REPORTA_3]) - assert extract_metrics(METRIC, URL) != {} + # The exposed metric has to have sensor and socket as labels (the target is used as metric key) + assert TARGET1 in data + assert 'sensor' in data[REPORTA_1.target][0] + assert 'socket' in data[REPORTA_1.target][0] + assert data[REPORTA_1.target][0]['sensor'] == str(REPORTA_1.sensor) + assert data[REPORTA_1.target][0]['socket'] == str(REPORTA_1.metadata['socket']) -def test_save_three_report_with_same_target_must_expose_min_max_mean_and_std_data_for_the_two_first_reports(db): - db.put([REPORTA_1, REPORTA_2, REPORTA_3]) - data = extract_metrics(METRIC, URL) - assert 'min' in data - assert 'max' in data - assert 'mean' in data - assert 'std' in data +def test_save_one_report_must_expose_energy_metric_for_the_given_target(db_info): + db, url = db_info + db.put([REPORTA_1]) + data = extract_metrics(METRIC, url) + assert TARGET1 in data -def test_save_three_report_with_same_target_must_expose_correct_min_max_mean_and_std_data_for_the_two_first_reports(db): - db.put([REPORTA_1, REPORTA_2, REPORTA_3]) - data = extract_metrics(METRIC, URL) - assert data['min'][TARGET1][0]['value'] == 10 - assert data['max'][TARGET1][0]['value'] == 20 - assert data['mean'][TARGET1][0]['value'] == 15 - assert data['std'][TARGET1][0]['value'] == 5 +def test_save_one_report_must_expose_correct_value(db_info): + db, url = db_info + db.put([REPORTA_1]) + data = extract_metrics(METRIC, url) + assert data[TARGET1][0]['value'] == 10 -def test_save_five_report_with_same_target_must_expose_correct_min_max_mean_and_std_data_for_last_two_report(db): - db.put([REPORTA_1, REPORTA_2, REPORTA_3, REPORTA_4, REPORTA_5]) - data = extract_metrics(METRIC, URL) - assert data['min'][TARGET1][0]['value'] == 30 - assert data['max'][TARGET1][0]['value'] == 40 - assert data['mean'][TARGET1][0]['value'] == 35 - assert data['std'][TARGET1][0]['value'] == 5 +def test_save_two_reports_with_same_target_must_expose_correct_energy_value_for_second_report(db_info): + db, url = db_info + db.put([REPORTA_1, REPORTA_2]) + data = extract_metrics(METRIC, url) + assert data[TARGET1][0]['value'] == 20 -def test_save_three_report_with_different_target_must_not_expose_data(db): - db.put([REPORTA_1, REPORTB_2, REPORTA_3]) - assert extract_metrics(METRIC, URL) == {} +def test_save_two_report_with_different_target_must_expose_data_for_the_two_target(db_info): + db, url = db_info + db.put([REPORTA_1, REPORTB_1]) + data = extract_metrics(METRIC, url) + assert TARGET1 in data + assert TARGET2 in data -def test_save_six_report_with_different_target_must_expose_correct_data_for_each_target(db): - db.put([REPORTA_1, REPORTB_1, REPORTA_2, REPORTB_2, REPORTA_3, REPORTB_3, ]) +def test_save_two_report_with_different_target_must_expose_correct_data_for_each_target(db_info): + db, url = db_info + db.put([REPORTA_1, REPORTB_1]) - data = extract_metrics(METRIC, URL) - assert TARGET1 in data['mean'] - assert TARGET2 in data['mean'] + data = extract_metrics(METRIC, url) + assert data[TARGET1][0]['value'] == 10 + assert data[TARGET2][0]['value'] == 40 def test_save_report_from_two_target_and_then_report_from_one_target_must_finaly_only_expose_report_from_remaining_target( - db): - db.put( - [REPORTA_1, REPORTB_1, REPORTA_2, REPORTB_2, REPORTA_3, REPORTB_3, REPORTA_4, REPORTA_5, REPORTA_6, REPORTA_7]) - data = extract_metrics(METRIC, URL) - assert TARGET1 in data['mean'] - assert TARGET2 not in data['mean'] - assert TARGET2 not in data['max'] - assert TARGET1 in data['max'] + db_info): + db, url = db_info + db.put([REPORTA_1, REPORTB_1, REPORTA_2, REPORTA_3]) + data = extract_metrics(METRIC, url) + assert TARGET1 in data + assert TARGET2 not in data diff --git a/tests/unit/cli/conftest.py b/tests/unit/cli/conftest.py index 2ac51a52..58b05f31 100644 --- a/tests/unit/cli/conftest.py +++ b/tests/unit/cli/conftest.py @@ -28,11 +28,12 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import os import sys +from copy import deepcopy import pytest import tests.utils.cli as test_files_module from powerapi.cli.binding_manager import PreProcessorBindingManager -from powerapi.cli.generator import PullerGenerator, PusherGenerator, ProcessorGenerator, COMPONENT_TYPE_KEY, \ +from powerapi.cli.generator import PullerGenerator, PusherGenerator, COMPONENT_TYPE_KEY, \ LISTENER_ACTOR_KEY, MONITOR_NAME_SUFFIX, PreProcessorGenerator from powerapi.dispatcher import DispatcherActor, RouteTable from powerapi.filter import Filter @@ -135,7 +136,7 @@ def several_inputs_outputs_stream_prometheus_without_some_arguments_config(sever of prometheus output are removed """ for _, current_output in several_inputs_outputs_stream_config["output"].items(): - if current_output['type'] == 'prom': + if current_output['type'] == 'prometheus': current_output.pop('metric_name') current_output.pop('metric_description') current_output.pop('aggregation_period') @@ -739,3 +740,61 @@ def pre_processor_binding_manager_with_reused_puller_in_bindings( configuration=pre_processor_with_reused_puller_in_bindings_configuration) return PreProcessorBindingManager(pullers=pullers, processors=processors) + + +def get_config_with_longest_argument_names(config: dict, arguments: dict): + """ + Return a copy of the provided configuration with the longest name for each argument + :param dict config: Configuration to be modified + :param dict arguments: Arguments definition + """ + config_longest_names = {} + for argument_name in config.keys(): + current_argument = arguments[argument_name] + longest_argument_name = get_longest_name(current_argument.names) + config_longest_names[longest_argument_name] = config[argument_name] + + return config_longest_names + + +def get_longest_name(names: list): + """ + Return the longest name in the provide list + :param list names: List of names + """ + longest_name = "" + + for name in names: + if len(name) > len(longest_name): + longest_name = name + + return longest_name + + +def get_config_with_default_values(config: dict, arguments: dict): + """ + Get a configuration that contains all optional arguments with their default values + :param dict config: Configuration to be modified + :param dict arguments: Arguments definition + """ + + processed_arguments = [] + + config_all_values = deepcopy(config) + + for current_argument_name, current_argument in arguments.items(): + if current_argument not in processed_arguments: + argument_value_already_defined = False + + for name in current_argument.names: + if name in config: + argument_value_already_defined = True + + break + + if not argument_value_already_defined and current_argument.default_value is not None: + config_all_values[current_argument_name] = current_argument.default_value + + processed_arguments.append(current_argument) + + return config_all_values diff --git a/tests/unit/cli/test_config_parser.py b/tests/unit/cli/test_config_parser.py index 43f8decd..2e8bb759 100644 --- a/tests/unit/cli/test_config_parser.py +++ b/tests/unit/cli/test_config_parser.py @@ -35,6 +35,7 @@ SubgroupParserWithoutNameArgumentException, \ NoNameSpecifiedForSubgroupException, TooManyArgumentNamesException, MissingArgumentException, \ SameLengthArgumentNamesException, AlreadyAddedSubgroupException +from tests.unit.cli.conftest import get_config_with_longest_argument_names, get_config_with_default_values from tests.utils.cli.base_config_parser import load_configuration_from_json_file, \ generate_configuration_tuples_from_json_file, define_environment_variables_configuration_from_json_file, \ @@ -145,12 +146,15 @@ def test_validate_check_mandatory_arguments_on_configuration(base_config_parser) Test if mandatory arguments are verified by the parser """ conf = load_configuration_from_json_file('basic_configuration.json') + config_longest_names = get_config_with_default_values(config=conf, arguments=base_config_parser.arguments) + config_longest_names = get_config_with_longest_argument_names(config=config_longest_names, + arguments=base_config_parser.arguments) conf_without_mandatory_arguments = \ load_configuration_from_json_file('basic_configuration_without_mandatory_arguments.json') try: validated_config = base_config_parser.validate(conf) - assert validated_config == conf + assert validated_config == config_longest_names except MissingArgumentException: assert False @@ -179,12 +183,12 @@ def test_validate_adds_default_values_for_no_arguments_defined_in_configuration_ """ conf = load_configuration_from_json_file('basic_configuration_without_arguments_with_default_values.json') - expected_conf = conf.copy() - expected_conf['arg1'] = 3 - expected_conf['arg5'] = 'default value' + expected_conf_default_values = get_config_with_default_values(config=conf, arguments=base_config_parser.arguments) + expected_conf_default_values = get_config_with_longest_argument_names(config=expected_conf_default_values, + arguments=base_config_parser.arguments) validated_config = base_config_parser.validate(conf) - assert validated_config == expected_conf + assert validated_config == expected_conf_default_values def test_get_arguments_str_return_str_with_all_information(base_config_parser, base_config_parser_str_representation): diff --git a/tests/unit/cli/test_generator.py b/tests/unit/cli/test_generator.py index 9295e7ae..c1867366 100644 --- a/tests/unit/cli/test_generator.py +++ b/tests/unit/cli/test_generator.py @@ -268,7 +268,7 @@ def test_generate_several_pushers_from_config(several_inputs_outputs_stream_conf assert db.db_name == current_pusher_infos['db'] assert db.port == current_pusher_infos['port'] - elif pusher_type == 'prom': + elif pusher_type == 'prometheus': assert isinstance(db, PrometheusDB) assert db.address == current_pusher_infos['uri'] assert db.port == current_pusher_infos['port'] diff --git a/tests/unit/processor/pre/k8s/test_k8s_processor.py b/tests/unit/processor/pre/k8s/test_k8s_processor.py index e7be41a4..8cea83e9 100644 --- a/tests/unit/processor/pre/k8s/test_k8s_processor.py +++ b/tests/unit/processor/pre/k8s/test_k8s_processor.py @@ -208,6 +208,18 @@ def hwpc_report_with_metadata(self, hwpc_report, basic_added_event_k8s): return hwpc_report_with_metadata + @pytest.fixture() + def hwpc_report_empty_metadata_values(self, hwpc_report): + """ + Return a HWPC report with metadata values (pod namespace and pod name) + """ + hwpc_report_with_empty_metadata_values = deepcopy(hwpc_report) + + hwpc_report_with_empty_metadata_values.metadata[POD_NAMESPACE_METADATA_KEY] = "" + hwpc_report_with_empty_metadata_values.metadata[POD_NAME_METADATA_KEY] = "" + + return hwpc_report_with_empty_metadata_values + @pytest.fixture def actor(self, started_fake_target_actor, pods_list, mocked_watch_initialized): return K8sPreProcessorActor(name='test_k8s_processor_actor', ks8_api_mode=MANUAL_CONFIG_MODE, @@ -286,6 +298,7 @@ def test_update_metadata_cache_with_added_event(self, mocked_monitor_added_event mocked_monitor_added_event.stop_monitoring.set() + @pytest.mark.skip(reason='to be executed only locally. It fails sometimes because of multiprocessing') def test_update_metadata_cache_with_modified_event(self, mocked_monitor_modified_event, started_actor, basic_modified_event_k8s, shutdown_system): """ @@ -367,12 +380,13 @@ def test_add_metadata_to_hwpc_report(self, mocked_monitor_added_event, mocked_monitor_added_event.stop_monitoring.set() - def test_add_metadata_to_hwpc_report_does_not_modify_report_with_unknown_container_id(self, - mocked_monitor_added_event, - started_actor, - hwpc_report, - dummy_pipe_out, - shutdown_system): + def test_add_metadata_to_hwpc_report_add_empty_values_with_unknown_container_id(self, + mocked_monitor_added_event, + started_actor, + hwpc_report, + dummy_pipe_out, + shutdown_system, + hwpc_report_empty_metadata_values): """ Test that a HWPC report is not modified with an unknown container id """ @@ -381,4 +395,4 @@ def test_add_metadata_to_hwpc_report_does_not_modify_report_with_unknown_contain result = recv_from_pipe(dummy_pipe_out, 4) - assert result[1] == hwpc_report + assert result[1] == hwpc_report_empty_metadata_values diff --git a/tests/utils/cli/several_inputs_outputs_stream_mode_enabled_configuration.json b/tests/utils/cli/several_inputs_outputs_stream_mode_enabled_configuration.json index 4c379435..ae596c0a 100644 --- a/tests/utils/cli/several_inputs_outputs_stream_mode_enabled_configuration.json +++ b/tests/utils/cli/several_inputs_outputs_stream_mode_enabled_configuration.json @@ -39,7 +39,7 @@ "db": "my_db_result_2" }, "third_pusher": { - "type": "prom", + "type": "prometheus", "model": "PowerReport", "uri": "127.0.0.1", "port": 2222,