diff --git a/awx/main/dispatch/worker/callback.py b/awx/main/dispatch/worker/callback.py index 199302c76c5a..6df75595ba00 100644 --- a/awx/main/dispatch/worker/callback.py +++ b/awx/main/dispatch/worker/callback.py @@ -20,6 +20,7 @@ from awx.main.constants import ACTIVE_STATES from awx.main.models.events import emit_event_detail from awx.main.utils.profiling import AWXProfiler +from awx.main.tasks.system import events_processed_hook import awx.main.analytics.subsystem_metrics as s_metrics from .base import BaseWorker @@ -46,7 +47,7 @@ def job_stats_wrapup(job_identifier, event=None): # If the status was a finished state before this update was made, send notifications # If not, we will send notifications when the status changes if uj.status not in ACTIVE_STATES: - uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed') + events_processed_hook(uj) except Exception: logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier)) diff --git a/awx/main/tasks/host_indirect.py b/awx/main/tasks/host_indirect.py index 7f8e8071dc3b..c14fd4bc97da 100644 --- a/awx/main/tasks/host_indirect.py +++ b/awx/main/tasks/host_indirect.py @@ -1,19 +1,26 @@ -from types import SimpleNamespace import logging +import yaml + import jq +from awx.main.dispatch.publish import task +from awx.main.dispatch import get_task_queuename +from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit +from awx.main.models.event_query import EventQuery +from awx.main.models import Job + logger = logging.getLogger(__name__) -def build_indirect_host_data(job, event_query: dict): +def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]: results = [] compiled_jq_expressions = {} # Cache for compiled jq expressions facts_missing_logged = False - for event in job.job_events.filter(task__in=event_query.keys()): + for event in job.job_events.filter(task__in=job_event_queries.keys()).iterator(): if 'res' not in event.event_data: continue - jq_str_for_event = event_query[event.task] + jq_str_for_event = job_event_queries[event.task] if jq_str_for_event not in compiled_jq_expressions: compiled_jq_expressions[event.task] = jq.compile(jq_str_for_event) compiled_jq = compiled_jq_expressions[event.task] @@ -24,5 +31,31 @@ def build_indirect_host_data(job, event_query: dict): continue canonical_facts = data['canonical_facts'] facts = data.get('facts') - results.append(SimpleNamespace(canonical_facts=canonical_facts, facts=facts)) + results.append(IndirectManagedNodeAudit(canonical_facts=canonical_facts, facts=facts, job=job, organization=job.organization)) return results + + +def fetch_job_event_query(job) -> dict[str, str]: + """Returns the following data structure + { + "demo.query.example": "{canonical_facts: {host_name: .direct_host_name}}" + } + The keys are fully-qualified Ansible module names, and the values are strings which are jq expressions. + + This contains all event query expressions that pertain to the given job + """ + net_job_data = {} + for fqcn, collection_data in job.installed_collections.items(): + event_query = EventQuery.objects.filter(fqcn=fqcn, collection_version=collection_data['version']).first() + if event_query: + collection_data = yaml.safe_load(event_query.event_query) + net_job_data.update(collection_data) + return net_job_data + + +@task(queue=get_task_queuename) +def save_indirect_host_entries(job_id): + job = Job.objects.get(id=job_id) + job_event_queries = fetch_job_event_query(job) + records = build_indirect_host_data(job, job_event_queries) + IndirectManagedNodeAudit.objects.bulk_create(records) diff --git a/awx/main/tasks/jobs.py b/awx/main/tasks/jobs.py index d25ced2f347b..339dc87261e9 100644 --- a/awx/main/tasks/jobs.py +++ b/awx/main/tasks/jobs.py @@ -68,6 +68,7 @@ from awx.main.tasks.signals import with_signal_handling, signal_callback from awx.main.tasks.receptor import AWXReceptorJob from awx.main.tasks.facts import start_fact_cache, finish_fact_cache +from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields, events_processed_hook from awx.main.exceptions import AwxTaskError, PostRunError, ReceptorNodeNotFound from awx.main.utils.ansible import read_ansible_config from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja @@ -81,7 +82,6 @@ ) from awx.conf.license import get_license from awx.main.utils.handlers import SpecialInventoryHandler -from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields from awx.main.utils.update_model import update_model from rest_framework.exceptions import PermissionDenied from django.utils.translation import gettext_lazy as _ @@ -641,7 +641,7 @@ def run(self, pk, **kwargs): # Field host_status_counts is used as a metric to check if event processing is finished # we send notifications if it is, if not, callback receiver will send them if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched): - self.instance.send_notification_templates('succeeded' if status == 'successful' else 'failed') + events_processed_hook(self.instance) try: self.final_run_hook(self.instance, status, private_data_dir) diff --git a/awx/main/tasks/system.py b/awx/main/tasks/system.py index e9facd55a894..31d890a6bcc1 100644 --- a/awx/main/tasks/system.py +++ b/awx/main/tasks/system.py @@ -30,6 +30,8 @@ # Django-CRUM from crum import impersonate +# Django flags +from flags.state import flag_enabled # Runner import ansible_runner.cleanup @@ -63,6 +65,7 @@ from awx.main.utils.reload import stop_local_services from awx.main.tasks.helpers import is_run_threshold_reached +from awx.main.tasks.host_indirect import save_indirect_host_entries from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config from awx.main.consumers import emit_channel_notification from awx.main import analytics @@ -365,6 +368,15 @@ def send_notifications(notification_list, job_id=None): logger.exception('Error saving notification {} result.'.format(notification.id)) +def events_processed_hook(unified_job): + """This method is intended to be called for every unified job + after the playbook_on_stats/EOF event is processed and final status is saved + Either one of these events could happen before the other, or there may be no events""" + unified_job.send_notification_templates('succeeded' if unified_job.status == 'successful' else 'failed') + if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"): + save_indirect_host_entries.delay(unified_job.id) + + @task(queue=get_task_queuename) def gather_analytics(): if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL): diff --git a/awx/main/tests/live/tests/test_indirect_host_counting.py b/awx/main/tests/live/tests/test_indirect_host_counting.py index 9dd346e767d7..b4ea8097328b 100644 --- a/awx/main/tests/live/tests/test_indirect_host_counting.py +++ b/awx/main/tests/live/tests/test_indirect_host_counting.py @@ -1,8 +1,9 @@ import yaml +import time from awx.main.tests.live.tests.conftest import wait_for_events - from awx.main.tasks.host_indirect import build_indirect_host_data +from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit from awx.main.models import Job @@ -21,9 +22,12 @@ def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook): assert len(results) == 1 host_audit_entry = results[0] + canonical_facts = {'host_name': 'foo_host_default'} + facts = {'device_type': 'Fake Host'} + # Asserts on data that will match to the input jq string from above - assert host_audit_entry.canonical_facts == {'host_name': 'foo_host_default'} - assert host_audit_entry.facts == {'device_type': 'Fake Host'} + assert host_audit_entry.canonical_facts == canonical_facts + assert host_audit_entry.facts == facts # Test collection of data assert 'demo.query' in job.installed_collections @@ -33,3 +37,17 @@ def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook): assert hq_data == {'demo.query.example': module_jq_str} assert job.ansible_version + + # This will poll, because it depends on the background task finishing + for _ in range(10): + if IndirectManagedNodeAudit.objects.filter(job=job).exists(): + break + time.sleep(0.2) + else: + raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}') + + assert IndirectManagedNodeAudit.objects.filter(job=job).count() == 1 + host_audit = IndirectManagedNodeAudit.objects.filter(job=job).first() + assert host_audit.canonical_facts == canonical_facts + assert host_audit.facts == facts + assert host_audit.organization == job.organization