diff --git a/aviso-server/admin/aviso_admin/admin.py b/aviso-server/admin/aviso_admin/admin.py index d53902b..ccacce4 100644 --- a/aviso-server/admin/aviso_admin/admin.py +++ b/aviso-server/admin/aviso_admin/admin.py @@ -22,53 +22,77 @@ from aviso_monitoring.udp_server import UdpServer -def main(): - # load the configuration - config = Config() - logger.info(f"Running Aviso-admin v.{__version__}") - logger.info(f"aviso_monitoring module v.{monitoring_version}") - logger.info(f"Configuration loaded: {config}") - - # instantiate the compactor and cleaner +def setup_compactor_and_cleaner(config): + """Sets up the compactor and cleaner with scheduling.""" compactor = Compactor(config.compactor) - cleaner = Cleaner(config.cleaner) - - # Every day at scheduled time run the compactor if compactor.enabled: schedule.every().day.at(config.compactor["scheduled_time"]).do(compactor.run) - # Every day at scheduled time run the cleaner + cleaner = Cleaner(config.cleaner) if cleaner.enabled: schedule.every().day.at(config.cleaner["scheduled_time"]).do(cleaner.run) - # create the UDP server - receiver = Receiver() - udp_server = UdpServer(config.monitoring.udp_server, receiver) - udp_server.start() - - # schedule reporters - rest_reporter = AvisoRestReporter(config.monitoring, receiver) - if rest_reporter.enabled: - schedule.every(rest_reporter.frequency).minutes.do(rest_reporter.run) - auth_reporter = AvisoAuthReporter(config.monitoring, receiver) - if auth_reporter.enabled: - schedule.every(auth_reporter.frequency).minutes.do(auth_reporter.run) - etcd_reporter = EtcdReporter(config.monitoring, receiver) - if etcd_reporter.enabled: - schedule.every(etcd_reporter.frequency).minutes.do(etcd_reporter.run) - - # launch the prometheus reporter, this expose some tlms to /metrics + +def setup_udp_server(config, receiver): + """Initializes and starts the UDP server.""" + try: + udp_server = UdpServer(config.monitoring.udp_server, receiver) + udp_server.start() + return udp_server + except Exception as e: + logger.exception("Failed to start UDP Server: %s", e) + + +def schedule_reporters(config, receiver): + """Schedules various reporters based on the configuration.""" + for reporter_class in [AvisoRestReporter, AvisoAuthReporter, EtcdReporter]: + reporter = reporter_class(config.monitoring, receiver) + if reporter.enabled: + schedule.every(reporter.frequency).minutes.do(reporter.run) + + +def start_prometheus_reporter(config, receiver): + """Starts the Prometheus reporter if enabled.""" prometheus_reporter = PrometheusReporter(config.monitoring, receiver) if prometheus_reporter.enabled: prometheus_reporter.start() - # Loop so that the scheduling task keeps on running all time. - while True: - # Checks whether a scheduled task is pending to run or not - schedule.run_pending() - time.sleep(30) + +def main(): + """Main function to run the application.""" + # Load the configuration + config = Config() + logger.info(f"Running Aviso-admin v.{__version__}") + logger.info(f"aviso_monitoring module v.{monitoring_version}") + logger.info(f"Configuration loaded: {config}") + + # Set up compactor and cleaner + setup_compactor_and_cleaner(config) + + # Create the UDP server + receiver = Receiver() + udp_server = setup_udp_server(config, receiver) + + # Schedule reporters + schedule_reporters(config, receiver) + + # Start Prometheus reporter + start_prometheus_reporter(config, receiver) + + # Main loop for running scheduled tasks + try: + while True: + schedule.run_pending() + time.sleep(30) + except KeyboardInterrupt: + logger.info("Application stopped by user.") + except Exception as e: + logger.exception("Unexpected error occurred: %s", e) + finally: + if udp_server: + udp_server.stop() # Assuming a method to gracefully stop the UDP server + logger.info("Application shutdown.") -# when running directly from this file if __name__ == "__main__": main() diff --git a/aviso-server/admin/migration/etcd_migration.py b/aviso-server/admin/migration/etcd_migration.py deleted file mode 100644 index 326f03f..0000000 --- a/aviso-server/admin/migration/etcd_migration.py +++ /dev/null @@ -1,178 +0,0 @@ -# (C) Copyright 1996- ECMWF. -# -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation -# nor does it submit to any jurisdiction. - -import base64 -from typing import Dict - -import requests - -old_etcd: str = "__OLD_ETCD_ADDRESS__:PORT" -new_etcd: str = "__NEW_ETCD_ADDRESS__:PORT" -from_revision: int = 145679891 -MAX_KV_RETURNED: int = 10000 - - -def push_kvpairs(etcd_repo, kvs): - """ - Submit key-value pairs to the etcd_repo - :param etcd_repo - :param kvs - :return: True if completed - """ - - print(f"Pushing key-value pairs to {etcd_repo} ...") - - url = etcd_repo + "/v3/kv/txn" - - ops = [] - # Prepare the transaction with a put operation for each KV pair - for kv in kvs: - k = encode_to_str_base64(kv["key"]) - v = encode_to_str_base64(kv["value"]) - put = {"requestPut": {"key": k, "value": v}} - ops.append(put) - - body = {"success": ops} - - # commit transaction - resp = requests.post(url, json=body) - resp.raise_for_status() - - print("Operation completed") - - resp_body = resp.json() - # read the header - if "header" in resp_body: - h = resp_body["header"] - rev = int(h["revision"]) - print(f"New server revision {rev}") - - return True - - -def pull_kvpairs(etcd_repo, revision): - """ - Retrieve key-value pairs newer than the revision number from the etcd_repo - :param etcd_repo - :param revision - :return: kv pairs as dictionary - """ - main_key = "/ec/" - - old_etcd_url = etcd_repo + "/v3/kv/range" - - print(f"Getting key-value pairs to {etcd_repo} newer than {revision} ...") - - range_end = encode_to_str_base64(str(incr_last_byte(main_key), "utf-8")) - - # encode key - encoded_key = encode_to_str_base64(main_key) - - # create the body for the get range on the etcd sever, order them newest first - body = { - "key": encoded_key, - "range_end": range_end, - "limit": MAX_KV_RETURNED, - "sort_order": "DESCEND", - "sort_target": "KEY", - "min_mod_revision": revision, - } - # make the call - # print(f"Pull request: {body}") - - # start an infinite loop of request if the server side is unreachable - resp = requests.post(old_etcd_url, json=body) - resp.raise_for_status() - - print("Retrieval completed") - - # parse the result to return just key-value pairs - new_kvs = [] - resp_body = resp.json() - if "kvs" in resp_body: - print("Building key-value list") - for kv in resp_body["kvs"]: - new_kv = parse_raw_kv(kv, False) - new_kvs.append(new_kv) - print(f"Key: {new_kv['key']} pulled successfully") - - print(f"{len(new_kvs)} keys found") - return new_kvs - - -def parse_raw_kv(kv: Dict[str, any], key_only: bool = False) -> Dict[str, any]: - """ - Internal method to translate the kv pair coming from the etcd server into a dictionary that fits better this - application - :param kv: raw kv pair from the etcd server - :param key_only: - :return: translated kv pair as dictionary - """ - new_kv = {} - if not key_only: - new_kv["value"] = decode_to_bytes(kv["value"]) # leave it as binary - new_kv["key"] = decode_to_bytes(kv["key"]).decode() - new_kv["version"] = int(kv["version"]) - new_kv["create_rev"] = int(kv["create_revision"]) - new_kv["mod_rev"] = int(kv["mod_revision"]) - return new_kv - - -def encode_to_str_base64(obj: any) -> str: - """ - Internal method to translate the object passed in a field that could be accepted by etcd and the request library - for the key or value. The request library accepts only strings encoded in base64 while etcd wants binaries for - the key and value fields. - :param obj: - :return: a base64 string representation of the binary translation - """ - if type(obj) is bytes: - binary = obj - elif type(obj) is str: - binary = obj.encode() - else: - binary = str(obj).encode() - - return str(base64.b64encode(binary), "utf-8") - - -def decode_to_bytes(string: str) -> any: - """ - Internal method to translate what is coming back from the notification server. - The request library returns only string base64 encoded - :param string: - :return: the payload decoded from the base64 string representation - """ - return base64.decodebytes(string.encode()) - - -def incr_last_byte(path: str) -> bytes: - """ - This function determines the end of the range required for a range call with the etcd3 API - By incrementing the last byte of the input path, it allows to make a range call describing the input - path as a branch rather than a leaf path. - - :param path: the path representing the start of the range - :return: the path representing the end of the range - """ - bytes_types = (bytes, bytearray) - if not isinstance(path, bytes_types): - if not isinstance(path, str): - path = str(path) - path = path.encode("utf-8") - s = bytearray(path) - # increment the last byte - s[-1] = s[-1] + 1 - return bytes(s) - - -# first get the key-value pairs from the old repo -kvs = pull_kvpairs(old_etcd, from_revision) - -# send them to the new repo -completed = push_kvpairs(new_etcd, kvs) diff --git a/aviso-server/monitoring/aviso_monitoring/reporter/opsview_reporter.py b/aviso-server/monitoring/aviso_monitoring/reporter/opsview_reporter.py index 1dc3b6a..5d9415d 100644 --- a/aviso-server/monitoring/aviso_monitoring/reporter/opsview_reporter.py +++ b/aviso-server/monitoring/aviso_monitoring/reporter/opsview_reporter.py @@ -19,7 +19,7 @@ class OpsviewReporter(ABC): - metric_ssl_enabled = False + metric_token_enabled = False metric_token = "" def __init__(self, config: Config, msg_receiver=None): @@ -34,8 +34,8 @@ def configure_metric_vars(cls, config): Configures the class attributes based on the provided config. """ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - if config.kube_state_metrics["ssl_enabled"]: - cls.metric_ssl_enabled = True + if config.kube_state_metrics["token_enabled"]: + cls.metric_token_enabled = True cls.metric_token = config.kube_state_metrics["token"] def ms_authenticate(self, m_server): @@ -223,7 +223,7 @@ def retrieve_metrics(cls, metric_servers, req_timeout): logger.debug(f"Retrieving metrics from {url}...") headers = {} try: - if cls.metric_ssl_enabled: + if cls.metric_token_enabled: headers["Authorization"] = f"Bearer {cls.metric_token}" resp = requests.get(url, verify=False, timeout=req_timeout, headers=headers) except Exception as e: