diff --git a/awx/main/migrations/0201_eventquery.py b/awx/main/migrations/0201_eventquery.py new file mode 100644 index 000000000000..09fd4e371b21 --- /dev/null +++ b/awx/main/migrations/0201_eventquery.py @@ -0,0 +1,25 @@ +# Generated by Django 4.2.16 on 2025-01-27 12:19 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('main', '0200_delete_token_cleanup_job'), + ] + + operations = [ + migrations.CreateModel( + name='EventQuery', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('fqcn', models.CharField(max_length=255)), + ('collection_version', models.CharField(max_length=32)), + ('event_query', models.JSONField(default=dict)), + ], + options={ + 'unique_together': {('fqcn', 'collection_version')}, + }, + ), + ] diff --git a/awx/main/models/event_query.py b/awx/main/models/event_query.py new file mode 100644 index 000000000000..49110f08fc67 --- /dev/null +++ b/awx/main/models/event_query.py @@ -0,0 +1,27 @@ +from django.core.exceptions import ValidationError +from django.db import models + +from awx.main.models import BaseModel + + +class EventQuery(BaseModel): + """ + Event queries are jq present in some collections and used to filter job events + for indirectly created resources. + """ + + class Meta: + app_label = 'main' + unique_together = ['fqcn', 'collection_version'] + + fqcn = models.CharField(max_length=255) + collection_version = models.CharField(max_length=32) + event_query = models.JSONField(default=dict) + + def validate_unique(self, exclude=None): + try: + EventQuery.objects.get(fqcn=self.fqcn, collection_version=self.collection_version) + except EventQuery.DoesNotExist: + return + + raise ValidationError(f'an event query for collection {self.fqcn}, version {self.collection_version} already exists') diff --git a/awx/main/tasks/callback.py b/awx/main/tasks/callback.py index 069bc408c97d..91d369e0e9fc 100644 --- a/awx/main/tasks/callback.py +++ b/awx/main/tasks/callback.py @@ -1,10 +1,15 @@ import json +import os.path import time import logging from collections import deque +from awx.main.models.event_query import EventQuery + # Django from django.conf import settings +from django.contrib.messages.api import error +from django.core.exceptions import ValidationError from django_guid import get_guid from django.utils.functional import cached_property from django.db import connections @@ -15,11 +20,64 @@ from awx.main.utils.update_model import update_model from awx.main.queue import CallbackQueueDispatcher +from flags.state import flag_enabled + logger = logging.getLogger('awx.main.tasks.callback') +def collect_queries(query_file_contents) -> dict: + """ + collect_queries extracts host queries from the contents of + ansible_data.json + """ + result = {} + + try: + installed_collections = query_file_contents['installed_collections'] + except KeyError: + logger.error("installed_collections missing in callback response") + return result + + for key, value in installed_collections.items(): + if 'host_query' in value and 'version' in value: + result[key] = value + + return result + + +def try_load_query_file(artifact_dir) -> (bool, dict): + """ + try_load_query_file checks the artifact directory after job completion and + returns the contents of ansible_data.json if present + """ + + if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + return False, None + + queries_path = os.path.join(artifact_dir, "ansible_data.json") + if not os.path.isfile(queries_path): + logger.info(f"no query file found: {queries_path}") + return False, None + + try: + f = open(queries_path, "r") + except OSError as e: + logger.error(f"error opening query file {queries_path}: {e}") + return False, None + + with f: + try: + queries = json.load(f) + except ValueError as e: + logger.error(f"error parsing query file {queries_path}: {e}") + return False, None + + return True, queries + + class RunnerCallback: def __init__(self, model=None): + self.instance = None self.parent_workflow_job_id = None self.host_map = {} self.guid = get_guid() @@ -214,6 +272,21 @@ def status_handler(self, status_data, runner_config): self.delay_update(**{field_name: field_value}) def artifacts_handler(self, artifact_dir): + success, query_file_contents = try_load_query_file(artifact_dir) + if success: + collections_info = collect_queries(query_file_contents) + for collection, data in collections_info.items(): + version = data['version'] + event_query = data['host_query'] + instance = EventQuery(fqcn=collection, collection_version=version, event_query=event_query) + try: + instance.validate_unique() + instance.save() + + logger.info(f"eventy query for collection {collection}, version {version} created") + except ValidationError as e: + logger.info(e) + self.artifacts_processed = True