diff --git a/jenkins_pipelines/scripts/kafka/Dockerfile b/jenkins_pipelines/scripts/kafka/Dockerfile new file mode 100644 index 000000000..5af9926f4 --- /dev/null +++ b/jenkins_pipelines/scripts/kafka/Dockerfile @@ -0,0 +1,23 @@ +FROM apache/kafka:3.7.1 + +USER root + +RUN apk update && \ + apk add git=2.43.5-r0 python3=3.11.10-r0 python3-dev=3.11.10-r0 py3-pip=23.3.1-r0 gcc=13.2.1_git20231014-r0 g++=13.2.1_git20231014-r0 librdkafka-dev=2.3.0-r1 + +USER appuser + +COPY --chown=appuser "consumer.py" "producer.py" "/home/appuser/" + +WORKDIR "/home/appuser" + +RUN python3 -m venv "/home/appuser/venv" && \ + . "/home/appuser/venv/bin/activate" && \ + pip install confluent_kafka==2.3.0 GitPython==3.1.43 requests==2.32.3 && \ + git clone "https://github.com/SUSE/susemanager-ci" + +ENTRYPOINT ["/bin/bash", "-c", "/etc/kafka/docker/run & \ + /opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic sle_mu_43 --bootstrap-server localhost:9092 && \ + . /home/appuser/venv/bin/activate && \ + python3 consumer.py" \ +] diff --git a/jenkins_pipelines/scripts/kafka/README.md b/jenkins_pipelines/scripts/kafka/README.md new file mode 100644 index 000000000..ca48cab29 --- /dev/null +++ b/jenkins_pipelines/scripts/kafka/README.md @@ -0,0 +1,63 @@ +# Kafka automation concept + +Messaging system to automate manual tasks with SUSE internal services via API. + +## Requirements + +### Host + +Tested on the SLE 15 SP6 host deployed in a fully trusted environment with the following packages installed: +* `docker` package from [Virtualization repository](https://download.opensuse.org/repositories/Virtualization:/containers/15.6/). + +### Variables + +The following environment variables need to be exported on the container's host: +* [`JENKINS_API_TOKEN`](https://ci.suse.de/user/manager/configure). +* [`SLACK_API_URL_APPENDIX`](https://app.slack.com/client/T02863RC2AC/platform) (_in the form of `"T02863RC2AC//"`_). + +### Networking + +The following websites needs to be resolvable within the docker container network: +* `https://smelt.suse.de` +* `https://hooks.slack.com` +* `https://ci.suse.de` +* `https://github.com` + +## Usage + +### Building and Running + +Being in the `susemanager-ci/jenkins_pipelines/scripts/kafka` catalog, build `kafka` container: + +```bash +docker build . --tag "kafka" +``` + +With exported `JENKINS_API_TOKEN` and `SLACK_API_URL_APPENDIX`, run `kafka` container: + +```bash +docker run --name "kafka" --env JENKINS_API_TOKEN=${JENKINS_API_TOKEN} --env SLACK_API_URL_APPENDIX=${SLACK_API_URL_APPENDIX} --network "host" kafka +``` + +### Topics Available + +* `sle_mu_43`: + 1. Pulls the latest [MU requests](https://smelt.suse.de/overview/) to be accepted and generates json based on the latest [susemanager-ci](https://github.com/SUSE/susemanager-ci/tree/master) scripts. + 2. Start a [new manager-4.3-qe-sle-update pipeline](https://ci.suse.de/view/Manager/view/Manager-4.3/job/manager-4.3-qe-sle-update-NUE/) and monitors the status running. + 3. Send message to the dedicated Slack channel [andy-test](https://app.slack.com/client/T02863RC2AC/C033KJKDF9V) informing about the status. + + ⚠️ _Producing script should be integrated to the https://smelt.suse.de site, at the moment it is sending requests from container_. + +### Debugging + +Alongside kafka logging, built-in logger should capture API requests to external services with the corresponding return codes and return messages: + +```bash +docker logs "kafka" +``` + +## Additional resources + +* [SLE MU pipeline automation concept](https://github.com/SUSE/spacewalk/issues/24966). +* [SLE Maintenance updates document](https://confluence.suse.com/display/SUSEMANAGER/QE+SLE+Maintenance+Updates). + diff --git a/jenkins_pipelines/scripts/kafka/consumer.py b/jenkins_pipelines/scripts/kafka/consumer.py new file mode 100644 index 000000000..9d3396cdb --- /dev/null +++ b/jenkins_pipelines/scripts/kafka/consumer.py @@ -0,0 +1,210 @@ + +import functools +import json +import logging +import os +import subprocess +import time +import dataclasses + +import confluent_kafka +import git +import requests + +import producer + + +logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s [KAFKA CONSUMER]: %(message)s' +) + + +class APIClients: + authorization_parameters: tuple = ('manager', os.getenv('JENKINS_API_TOKEN')) + + @staticmethod + def log_http_requests(request): + @functools.wraps(request) + def wrapper(self, *args, **kwargs): + response = request(self, *args, **kwargs) + logging.info(f"{request.__name__.upper()} {args[0]}, STATUS: {response.status_code}") + if response.status_code not in (200, 201): + logging.error(f"{response.content}") + return response + return wrapper + + @log_http_requests + def get(self, endpoint: str): + return requests.get(endpoint, auth=self.authorization_parameters, verify=False, timeout=10) + + @log_http_requests + def post(self, endpoint: str, params=None, data=None): + if endpoint.startswith('https://hooks.slack.com'): + return requests.post(endpoint, headers={'Content-Type': 'application/json'}, data=json.dumps(data), timeout=10) + return requests.post(endpoint, auth=self.authorization_parameters, params=params, verify=False, timeout=10) + + +@dataclasses.dataclass +class KafkaConsumer: + consumer = confluent_kafka.Consumer({ + 'bootstrap.servers': 'localhost:9092', + 'group.id': 'jenkins_pipelines', + 'auto.offset.reset': 'earliest', + 'enable.auto.commit': False, + 'max.poll.interval.ms': 86400000 + }) + kafka_topic = 'sle_mu_43' + api_clients = APIClients() + + def __post_init__(self) -> None: + self.consumer.subscribe([self.kafka_topic]) + + @staticmethod + def pull_latest_susemanager_ci(): + susemanager_ci_repository = git.Repo('/home/appuser/susemanager-ci') + try: + susemanager_ci_repository.remotes.origin.pull() + except git.exc.GitError as stderr: + logging.warning(f"Error during git pull on susemanager-ci repository: {stderr}") + + @staticmethod + def generate_custom_repositories(incidents: dict): + incident_numbers = ','.join( + str(incident['incident']['incident_id']) + for incident in incidents['data'] + ) + try: + subprocess.run( + [ + "python3", + "susemanager-ci/jenkins_pipelines/scripts/json_generator/maintenance_json_generator.py", + "-i", incident_numbers + ], + check=True + ) + except subprocess.CalledProcessError: + susemanager_ci_latest_commit = git.Repo('/home/appuser/susemanager-ci').head.commit + logging.error(f"Cannot generate JSON file on the {susemanager_ci_latest_commit} commit of susemanager-ci repository with {incident_numbers} MI IDs") + + + def run_jenkins_pipeline(self) -> int | None: + instances_involved = ['server', 'proxy', 'sle15sp4_client', 'sle15sp4_minion'] + with open('custom_repositories.json', 'r', encoding='utf-8') as custom_repositories: + custom_repositories_formatted = json.dumps( + { + key: value for key, value in json.load(custom_repositories).items() + if key in instances_involved + }, + indent=4 + ) + build_parameters = { + 'cucumber_gitrepo': 'https://github.com/SUSE/spacewalk.git', + 'cucumber_ref': 'Manager-4.3', + 'tf_file': 'susemanager-ci/terracumber_config/tf_files/SUSEManager-4.3-SLE-update-NUE.tf', + 'sumaform_gitrepo': 'https://github.com/uyuni-project/sumaform.git', + 'sumaform_ref': 'master', + 'sumaform_backend': 'libvirt', + 'terraform_bin': '/usr/bin/terraform', + 'terraform_bin_plugins': '/usr/bin', + 'terraform_parallelism': '', + 'terracumber_gitrepo': 'https://github.com/uyuni-project/terracumber.git', + 'terracumber_ref': 'master', + 'minions_to_run': 'sles15sp4_minion', + 'use_previous_terraform_state': 'false', + 'must_deploy': 'true', + 'must_run_core': 'true', + 'must_sync': 'true', + 'enable_proxy_stages': 'true', + 'enable_client_stages': 'true', + 'must_add_MU_repositories': 'true', + 'must_add_non_MU_repositories': 'true', + 'must_add_keys': 'true', + 'must_create_bootstrap_repos': 'true', + 'must_boot_node': 'true', + 'must_run_tests': 'true', + 'must_run_containerization_tests': 'false', + 'confirm_before_continue': 'false', + 'custom_repositories': custom_repositories_formatted + } + + request = self.api_clients.post( + 'https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/buildWithParameters', + params=build_parameters + ) + + if request.status_code == 201: + time.sleep(10) # to avoid "In the quiet period. Expires in <10 sec" + request = self.api_clients.get(f"{request.headers['Location']}/api/json") + response = request.json() + try: + build_number = response['executable']['number'] + os.rename('custom_repositories.json', f'custom_repositories_{build_number}.json') + return build_number + except KeyError: + logging.error(f"Build number {build_number} was not found in the currently running pipelines, latest output: {response['why']}. Please check if someone else is working on the pipeline.") + return None + if request.status_code == 431: + logging.error(f'Request is too big, perhaps too many RRs to be accepted generated big JSON, run pipeline manually using generated custom_repositories.json in {os.getcwd()}') + return None + + def pipeline_enabled(self) -> bool: + return not self.api_clients.get(f'https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/api/json').json()['color'] == 'disabled' + + def build_status(self, build_number: int) -> str: + request = self.api_clients.get(f'https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/{build_number}/api/json') + response = request.json() + if response['inProgress']: + return 'INPROGRESS' + return response['result'] + + def send_message_slack(self, incidents: dict, build_number: int, status: str) -> None: + mu_requests = [ + f"https://build.suse.de/request/show/{incident['request_id']}" + for incident in incidents['data'] + ] + message = { + 'message': f'SLE MU pipeline https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/{build_number} has status: {status} with the following requests: {mu_requests}' + } + self.api_clients.post( + f"https://hooks.slack.com/triggers/{os.getenv('SLACK_API_URL_APPENDIX')}", + data=message + ) + + def listen(self) -> None: + build_number: None | int = None + try: + while True: + time.sleep(300) + if self.pipeline_enabled() or build_number: + if build_number: + status = self.build_status(build_number) + logging.info(f'Pipeline build {build_number} STATUS: {status}') + if status != 'INPROGRESS': + self.send_message_slack(incidents, build_number, status) + build_number = None + else: + message = self.consumer.poll(timeout=1.0) + if message is None: + producer.produce() + else: + try: + incidents = json.loads(message.value().decode('utf-8')) + except json.decoder.JSONDecodeError: + logging.error(f'Could not decode kafka message: {message.value()}') + raise + if incidents["recordsFiltered"] > 0: + self.pull_latest_susemanager_ci() + self.generate_custom_repositories(incidents) + build_number = self.run_jenkins_pipeline() + self.consumer.commit(message) + else: + logging.info('Pipeline disabled') + except: + self.consumer.close() + raise + + +if __name__ == '__main__': + KafkaConsumer().listen() + diff --git a/jenkins_pipelines/scripts/kafka/producer.py b/jenkins_pipelines/scripts/kafka/producer.py new file mode 100644 index 000000000..9af2b1ed0 --- /dev/null +++ b/jenkins_pipelines/scripts/kafka/producer.py @@ -0,0 +1,24 @@ +import confluent_kafka +import requests + + +def produce(): + kafka_broker = 'localhost:9092' + kafka_topic = 'sle_mu_43' + smelt_testing_site = 'https://smelt.suse.de/api/v1/overview/testing/?format=datatables&draw=7&columns%5B0%5D%5Bdata%5D=category&columns%5B0%5D%5Bname%5D=category.id&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=true&columns%5B0%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B0%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B1%5D%5Bdata%5D=request_id&columns%5B1%5D%5Bname%5D=&columns%5B1%5D%5Bsearchable%5D=false&columns%5B1%5D%5Borderable%5D=false&columns%5B1%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B1%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B2%5D%5Bdata%5D=request_id&columns%5B2%5D%5Bname%5D=request_id%2C%20incident.incident_id&columns%5B2%5D%5Bsearchable%5D=true&columns%5B2%5D%5Borderable%5D=true&columns%5B2%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B2%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B3%5D%5Bdata%5D=comments_exists&columns%5B3%5D%5Bname%5D=&columns%5B3%5D%5Bsearchable%5D=false&columns%5B3%5D%5Borderable%5D=false&columns%5B3%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B3%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B4%5D%5Bdata%5D=created&columns%5B4%5D%5Bname%5D=&columns%5B4%5D%5Bsearchable%5D=true&columns%5B4%5D%5Borderable%5D=true&columns%5B4%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B4%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B5%5D%5Bdata%5D=due_date&columns%5B5%5D%5Bname%5D=due_date&columns%5B5%5D%5Bsearchable%5D=false&columns%5B5%5D%5Borderable%5D=true&columns%5B5%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B5%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B6%5D%5Bdata%5D=incident.priority&columns%5B6%5D%5Bname%5D=&columns%5B6%5D%5Bsearchable%5D=false&columns%5B6%5D%5Borderable%5D=true&columns%5B6%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B6%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B7%5D%5Bdata%5D=unfinished_reviews&columns%5B7%5D%5Bname%5D=&columns%5B7%5D%5Bsearchable%5D=true&columns%5B7%5D%5Borderable%5D=false&columns%5B7%5D%5Bsearch%5D%5Bvalue%5D=qam-manager&columns%5B7%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B8%5D%5Bdata%5D=packages&columns%5B8%5D%5Bname%5D=packages&columns%5B8%5D%5Bsearchable%5D=true&columns%5B8%5D%5Borderable%5D=false&columns%5B8%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B8%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B9%5D%5Bdata%5D=incident.references&columns%5B9%5D%5Bname%5D=incident.patchinfo.references.name&columns%5B9%5D%5Bsearchable%5D=true&columns%5B9%5D%5Borderable%5D=false&columns%5B9%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B9%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B10%5D%5Bdata%5D=channellist&columns%5B10%5D%5Bname%5D=channels.name&columns%5B10%5D%5Bsearchable%5D=true&columns%5B10%5D%5Borderable%5D=false&columns%5B10%5D%5Bsearch%5D%5Bvalue%5D=sp4&columns%5B10%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B11%5D%5Bdata%5D=created_by.username&columns%5B11%5D%5Bname%5D=created_by.username&columns%5B11%5D%5Bsearchable%5D=true&columns%5B11%5D%5Borderable%5D=true&columns%5B11%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B11%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B12%5D%5Bdata%5D=url&columns%5B12%5D%5Bname%5D=&columns%5B12%5D%5Bsearchable%5D=false&columns%5B12%5D%5Borderable%5D=false&columns%5B12%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B12%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B13%5D%5Bdata%5D=kind&columns%5B13%5D%5Bname%5D=&columns%5B13%5D%5Bsearchable%5D=false&columns%5B13%5D%5Borderable%5D=false&columns%5B13%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B13%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B14%5D%5Bdata%5D=rating&columns%5B14%5D%5Bname%5D=&columns%5B14%5D%5Bsearchable%5D=false&columns%5B14%5D%5Borderable%5D=false&columns%5B14%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B14%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B15%5D%5Bdata%5D=qa_comments_exist&columns%5B15%5D%5Bname%5D=&columns%5B15%5D%5Bsearchable%5D=false&columns%5B15%5D%5Borderable%5D=false&columns%5B15%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B15%5D%5Bsearch%5D%5Bregex%5D=false&order%5B0%5D%5Bcolumn%5D=6&order%5B0%5D%5Bdir%5D=desc&start=0&length=250&search%5Bvalue%5D=&search%5Bregex%5D=false&_=1726339618939' + headers = { + 'Accept': 'application/json, text/javascript, */*; q=0.01', + 'Accept-Language': 'en-US,en;q=0.6', + 'Connection': 'keep-alive', + 'Referer': 'https://smelt.suse.de/overview/', + 'Sec-Fetch-Dest': 'empty', + 'Sec-Fetch-Mode': 'cors', + 'Sec-Fetch-Site': 'same-origin', + 'Sec-GPC': '1', + 'X-Requested-With': 'XMLHttpRequest' + } + maintenance_incidents_on_qa = requests.get(smelt_testing_site, headers=headers, verify=False, timeout=10).content + producer = confluent_kafka.Producer({'bootstrap.servers': kafka_broker}) + producer.produce(kafka_topic, maintenance_incidents_on_qa) + producer.flush() +