Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka initial implementation #1416

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions jenkins_pipelines/scripts/kafka/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
FROM apache/kafka:3.7.1

USER root

RUN apk update && \
apk add git=2.43.5-r0 python3=3.11.10-r0 python3-dev=3.11.10-r0 py3-pip=23.3.1-r0 gcc=13.2.1_git20231014-r0 g++=13.2.1_git20231014-r0 librdkafka-dev=2.3.0-r1

USER appuser

COPY --chown=appuser "consumer.py" "producer.py" "/home/appuser/"

WORKDIR "/home/appuser"

RUN python3 -m venv "/home/appuser/venv" && \
. "/home/appuser/venv/bin/activate" && \
pip install confluent_kafka==2.3.0 GitPython==3.1.43 requests==2.32.3 && \
git clone "https://github.com/SUSE/susemanager-ci"

ENTRYPOINT ["/bin/bash", "-c", "/etc/kafka/docker/run & \
/opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic sle_mu_43 --bootstrap-server localhost:9092 && \
. /home/appuser/venv/bin/activate && \
python3 consumer.py" \
]
63 changes: 63 additions & 0 deletions jenkins_pipelines/scripts/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Kafka automation concept

Messaging system to automate manual tasks with SUSE internal services via API.

## Requirements

### Host

Tested on the SLE 15 SP6 host deployed in a fully trusted environment with the following packages installed:
* `docker` package from [Virtualization repository](https://download.opensuse.org/repositories/Virtualization:/containers/15.6/).

### Variables

The following environment variables need to be exported on the container's host:
* [`JENKINS_API_TOKEN`](https://ci.suse.de/user/manager/configure).
* [`SLACK_API_URL_APPENDIX`](https://app.slack.com/client/T02863RC2AC/platform) (_in the form of `"T02863RC2AC/<alphanumeric data>/<alphanumeric data>"`_).

### Networking

The following websites needs to be resolvable within the docker container network:
* `https://smelt.suse.de`
* `https://hooks.slack.com`
* `https://ci.suse.de`
* `https://github.com`

## Usage

### Building and Running

Being in the `susemanager-ci/jenkins_pipelines/scripts/kafka` catalog, build `kafka` container:

```bash
docker build . --tag "kafka"
```

With exported `JENKINS_API_TOKEN` and `SLACK_API_URL_APPENDIX`, run `kafka` container:

```bash
docker run --name "kafka" --env JENKINS_API_TOKEN=${JENKINS_API_TOKEN} --env SLACK_API_URL_APPENDIX=${SLACK_API_URL_APPENDIX} --network "host" kafka
```

### Topics Available

* `sle_mu_43`:
1. Pulls the latest [MU requests](https://smelt.suse.de/overview/) to be accepted and generates json based on the latest [susemanager-ci](https://github.com/SUSE/susemanager-ci/tree/master) scripts.
2. Start a [new manager-4.3-qe-sle-update pipeline](https://ci.suse.de/view/Manager/view/Manager-4.3/job/manager-4.3-qe-sle-update-NUE/) and monitors the status running.
3. Send message to the dedicated Slack channel [andy-test](https://app.slack.com/client/T02863RC2AC/C033KJKDF9V) informing about the status.

⚠️ _Producing script should be integrated to the https://smelt.suse.de site, at the moment it is sending requests from container_.

### Debugging

Alongside kafka logging, built-in logger should capture API requests to external services with the corresponding return codes and return messages:

```bash
docker logs "kafka"
```

## Additional resources

* [SLE MU pipeline automation concept](https://github.com/SUSE/spacewalk/issues/24966).
* [SLE Maintenance updates document](https://confluence.suse.com/display/SUSEMANAGER/QE+SLE+Maintenance+Updates).

210 changes: 210 additions & 0 deletions jenkins_pipelines/scripts/kafka/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@

import functools
import json
import logging
import os
import subprocess
import time
import dataclasses

import confluent_kafka
import git
import requests

import producer


logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s [KAFKA CONSUMER]: %(message)s'
)


class APIClients:
authorization_parameters: tuple = ('manager', os.getenv('JENKINS_API_TOKEN'))

@staticmethod
def log_http_requests(request):
@functools.wraps(request)
def wrapper(self, *args, **kwargs):
response = request(self, *args, **kwargs)
logging.info(f"{request.__name__.upper()} {args[0]}, STATUS: {response.status_code}")
if response.status_code not in (200, 201):
logging.error(f"{response.content}")
return response
return wrapper

@log_http_requests
def get(self, endpoint: str):
return requests.get(endpoint, auth=self.authorization_parameters, verify=False, timeout=10)

@log_http_requests
def post(self, endpoint: str, params=None, data=None):
if endpoint.startswith('https://hooks.slack.com'):
return requests.post(endpoint, headers={'Content-Type': 'application/json'}, data=json.dumps(data), timeout=10)
return requests.post(endpoint, auth=self.authorization_parameters, params=params, verify=False, timeout=10)


@dataclasses.dataclass
class KafkaConsumer:
consumer = confluent_kafka.Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'jenkins_pipelines',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
'max.poll.interval.ms': 86400000
})
kafka_topic = 'sle_mu_43'
api_clients = APIClients()

def __post_init__(self) -> None:
self.consumer.subscribe([self.kafka_topic])

@staticmethod
def pull_latest_susemanager_ci():
susemanager_ci_repository = git.Repo('/home/appuser/susemanager-ci')
try:
susemanager_ci_repository.remotes.origin.pull()
except git.exc.GitError as stderr:
logging.warning(f"Error during git pull on susemanager-ci repository: {stderr}")

@staticmethod
def generate_custom_repositories(incidents: dict):
incident_numbers = ','.join(
str(incident['incident']['incident_id'])
for incident in incidents['data']
)
try:
subprocess.run(
[
"python3",
"susemanager-ci/jenkins_pipelines/scripts/json_generator/maintenance_json_generator.py",
"-i", incident_numbers
],
check=True
)
except subprocess.CalledProcessError:
susemanager_ci_latest_commit = git.Repo('/home/appuser/susemanager-ci').head.commit
logging.error(f"Cannot generate JSON file on the {susemanager_ci_latest_commit} commit of susemanager-ci repository with {incident_numbers} MI IDs")


def run_jenkins_pipeline(self) -> int | None:
instances_involved = ['server', 'proxy', 'sle15sp4_client', 'sle15sp4_minion']
with open('custom_repositories.json', 'r', encoding='utf-8') as custom_repositories:
custom_repositories_formatted = json.dumps(
{
key: value for key, value in json.load(custom_repositories).items()
if key in instances_involved
},
indent=4
)
build_parameters = {
'cucumber_gitrepo': 'https://github.com/SUSE/spacewalk.git',
'cucumber_ref': 'Manager-4.3',
'tf_file': 'susemanager-ci/terracumber_config/tf_files/SUSEManager-4.3-SLE-update-NUE.tf',
'sumaform_gitrepo': 'https://github.com/uyuni-project/sumaform.git',
'sumaform_ref': 'master',
'sumaform_backend': 'libvirt',
'terraform_bin': '/usr/bin/terraform',
'terraform_bin_plugins': '/usr/bin',
'terraform_parallelism': '',
'terracumber_gitrepo': 'https://github.com/uyuni-project/terracumber.git',
'terracumber_ref': 'master',
'minions_to_run': 'sles15sp4_minion',
'use_previous_terraform_state': 'false',
'must_deploy': 'true',
'must_run_core': 'true',
'must_sync': 'true',
'enable_proxy_stages': 'true',
'enable_client_stages': 'true',
'must_add_MU_repositories': 'true',
'must_add_non_MU_repositories': 'true',
'must_add_keys': 'true',
'must_create_bootstrap_repos': 'true',
'must_boot_node': 'true',
'must_run_tests': 'true',
'must_run_containerization_tests': 'false',
'confirm_before_continue': 'false',
'custom_repositories': custom_repositories_formatted
}

request = self.api_clients.post(
'https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/buildWithParameters',
params=build_parameters
)

if request.status_code == 201:
time.sleep(10) # to avoid "In the quiet period. Expires in <10 sec"
request = self.api_clients.get(f"{request.headers['Location']}/api/json")
response = request.json()
try:
build_number = response['executable']['number']
os.rename('custom_repositories.json', f'custom_repositories_{build_number}.json')
return build_number
except KeyError:
logging.error(f"Build number {build_number} was not found in the currently running pipelines, latest output: {response['why']}. Please check if someone else is working on the pipeline.")
return None
if request.status_code == 431:
logging.error(f'Request is too big, perhaps too many RRs to be accepted generated big JSON, run pipeline manually using generated custom_repositories.json in {os.getcwd()}')
return None

def pipeline_enabled(self) -> bool:
return not self.api_clients.get(f'https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/api/json').json()['color'] == 'disabled'

def build_status(self, build_number: int) -> str:
request = self.api_clients.get(f'https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/{build_number}/api/json')
response = request.json()
if response['inProgress']:
return 'INPROGRESS'
return response['result']

def send_message_slack(self, incidents: dict, build_number: int, status: str) -> None:
mu_requests = [
f"https://build.suse.de/request/show/{incident['request_id']}"
for incident in incidents['data']
]
message = {
'message': f'SLE MU pipeline https://ci.suse.de/job/manager-4.3-qe-sle-update-NUE/{build_number} has status: {status} with the following requests: {mu_requests}'
}
self.api_clients.post(
f"https://hooks.slack.com/triggers/{os.getenv('SLACK_API_URL_APPENDIX')}",
data=message
)

def listen(self) -> None:
build_number: None | int = None
try:
while True:
time.sleep(300)
if self.pipeline_enabled() or build_number:
if build_number:
status = self.build_status(build_number)
logging.info(f'Pipeline build {build_number} STATUS: {status}')
if status != 'INPROGRESS':
self.send_message_slack(incidents, build_number, status)
build_number = None
else:
message = self.consumer.poll(timeout=1.0)
if message is None:
producer.produce()
else:
try:
incidents = json.loads(message.value().decode('utf-8'))
except json.decoder.JSONDecodeError:
logging.error(f'Could not decode kafka message: {message.value()}')
raise
if incidents["recordsFiltered"] > 0:
self.pull_latest_susemanager_ci()
self.generate_custom_repositories(incidents)
build_number = self.run_jenkins_pipeline()
self.consumer.commit(message)
else:
logging.info('Pipeline disabled')
except:
self.consumer.close()
raise


if __name__ == '__main__':
KafkaConsumer().listen()

24 changes: 24 additions & 0 deletions jenkins_pipelines/scripts/kafka/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import confluent_kafka
import requests


def produce():
kafka_broker = 'localhost:9092'
kafka_topic = 'sle_mu_43'
smelt_testing_site = 'https://smelt.suse.de/api/v1/overview/testing/?format=datatables&draw=7&columns%5B0%5D%5Bdata%5D=category&columns%5B0%5D%5Bname%5D=category.id&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=true&columns%5B0%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B0%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B1%5D%5Bdata%5D=request_id&columns%5B1%5D%5Bname%5D=&columns%5B1%5D%5Bsearchable%5D=false&columns%5B1%5D%5Borderable%5D=false&columns%5B1%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B1%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B2%5D%5Bdata%5D=request_id&columns%5B2%5D%5Bname%5D=request_id%2C%20incident.incident_id&columns%5B2%5D%5Bsearchable%5D=true&columns%5B2%5D%5Borderable%5D=true&columns%5B2%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B2%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B3%5D%5Bdata%5D=comments_exists&columns%5B3%5D%5Bname%5D=&columns%5B3%5D%5Bsearchable%5D=false&columns%5B3%5D%5Borderable%5D=false&columns%5B3%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B3%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B4%5D%5Bdata%5D=created&columns%5B4%5D%5Bname%5D=&columns%5B4%5D%5Bsearchable%5D=true&columns%5B4%5D%5Borderable%5D=true&columns%5B4%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B4%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B5%5D%5Bdata%5D=due_date&columns%5B5%5D%5Bname%5D=due_date&columns%5B5%5D%5Bsearchable%5D=false&columns%5B5%5D%5Borderable%5D=true&columns%5B5%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B5%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B6%5D%5Bdata%5D=incident.priority&columns%5B6%5D%5Bname%5D=&columns%5B6%5D%5Bsearchable%5D=false&columns%5B6%5D%5Borderable%5D=true&columns%5B6%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B6%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B7%5D%5Bdata%5D=unfinished_reviews&columns%5B7%5D%5Bname%5D=&columns%5B7%5D%5Bsearchable%5D=true&columns%5B7%5D%5Borderable%5D=false&columns%5B7%5D%5Bsearch%5D%5Bvalue%5D=qam-manager&columns%5B7%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B8%5D%5Bdata%5D=packages&columns%5B8%5D%5Bname%5D=packages&columns%5B8%5D%5Bsearchable%5D=true&columns%5B8%5D%5Borderable%5D=false&columns%5B8%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B8%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B9%5D%5Bdata%5D=incident.references&columns%5B9%5D%5Bname%5D=incident.patchinfo.references.name&columns%5B9%5D%5Bsearchable%5D=true&columns%5B9%5D%5Borderable%5D=false&columns%5B9%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B9%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B10%5D%5Bdata%5D=channellist&columns%5B10%5D%5Bname%5D=channels.name&columns%5B10%5D%5Bsearchable%5D=true&columns%5B10%5D%5Borderable%5D=false&columns%5B10%5D%5Bsearch%5D%5Bvalue%5D=sp4&columns%5B10%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B11%5D%5Bdata%5D=created_by.username&columns%5B11%5D%5Bname%5D=created_by.username&columns%5B11%5D%5Bsearchable%5D=true&columns%5B11%5D%5Borderable%5D=true&columns%5B11%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B11%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B12%5D%5Bdata%5D=url&columns%5B12%5D%5Bname%5D=&columns%5B12%5D%5Bsearchable%5D=false&columns%5B12%5D%5Borderable%5D=false&columns%5B12%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B12%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B13%5D%5Bdata%5D=kind&columns%5B13%5D%5Bname%5D=&columns%5B13%5D%5Bsearchable%5D=false&columns%5B13%5D%5Borderable%5D=false&columns%5B13%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B13%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B14%5D%5Bdata%5D=rating&columns%5B14%5D%5Bname%5D=&columns%5B14%5D%5Bsearchable%5D=false&columns%5B14%5D%5Borderable%5D=false&columns%5B14%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B14%5D%5Bsearch%5D%5Bregex%5D=false&columns%5B15%5D%5Bdata%5D=qa_comments_exist&columns%5B15%5D%5Bname%5D=&columns%5B15%5D%5Bsearchable%5D=false&columns%5B15%5D%5Borderable%5D=false&columns%5B15%5D%5Bsearch%5D%5Bvalue%5D=&columns%5B15%5D%5Bsearch%5D%5Bregex%5D=false&order%5B0%5D%5Bcolumn%5D=6&order%5B0%5D%5Bdir%5D=desc&start=0&length=250&search%5Bvalue%5D=&search%5Bregex%5D=false&_=1726339618939'
headers = {
'Accept': 'application/json, text/javascript, */*; q=0.01',
'Accept-Language': 'en-US,en;q=0.6',
'Connection': 'keep-alive',
'Referer': 'https://smelt.suse.de/overview/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Sec-GPC': '1',
'X-Requested-With': 'XMLHttpRequest'
}
maintenance_incidents_on_qa = requests.get(smelt_testing_site, headers=headers, verify=False, timeout=10).content
producer = confluent_kafka.Producer({'bootstrap.servers': kafka_broker})
producer.produce(kafka_topic, maintenance_incidents_on_qa)
producer.flush()