Skip to content

Commit

Permalink
Connect host audit model to jq logic via new task
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Jan 29, 2025
1 parent 0a59028 commit 4392587
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 11 deletions.
3 changes: 2 additions & 1 deletion awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from awx.main.constants import ACTIVE_STATES
from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
from awx.main.tasks.system import events_processed_hook
import awx.main.analytics.subsystem_metrics as s_metrics
from .base import BaseWorker

Expand All @@ -46,7 +47,7 @@ def job_stats_wrapup(job_identifier, event=None):
# If the status was a finished state before this update was made, send notifications
# If not, we will send notifications when the status changes
if uj.status not in ACTIVE_STATES:
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
events_processed_hook(uj)

except Exception:
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))
Expand Down
43 changes: 38 additions & 5 deletions awx/main/tasks/host_indirect.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
from types import SimpleNamespace
import logging

import yaml

import jq

from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models.event_query import EventQuery
from awx.main.models import Job

logger = logging.getLogger(__name__)


def build_indirect_host_data(job, event_query: dict):
def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]:
results = []
compiled_jq_expressions = {} # Cache for compiled jq expressions
facts_missing_logged = False
for event in job.job_events.filter(task__in=event_query.keys()):
for event in job.job_events.filter(task__in=job_event_queries.keys()).iterator():
if 'res' not in event.event_data:
continue
jq_str_for_event = event_query[event.task]
jq_str_for_event = job_event_queries[event.task]
if jq_str_for_event not in compiled_jq_expressions:
compiled_jq_expressions[event.task] = jq.compile(jq_str_for_event)
compiled_jq = compiled_jq_expressions[event.task]
Expand All @@ -24,5 +31,31 @@ def build_indirect_host_data(job, event_query: dict):
continue
canonical_facts = data['canonical_facts']
facts = data.get('facts')
results.append(SimpleNamespace(canonical_facts=canonical_facts, facts=facts))
results.append(IndirectManagedNodeAudit(canonical_facts=canonical_facts, facts=facts, job=job, organization=job.organization))
return results


def fetch_job_event_query(job) -> dict[str, str]:
"""Returns the following data structure
{
"demo.query.example": "{canonical_facts: {host_name: .direct_host_name}}"
}
The keys are fully-qualified Ansible module names, and the values are strings which are jq expressions.
This contains all event query expressions that pertain to the given job
"""
net_job_data = {}
for fqcn, collection_data in job.installed_collections.items():
event_query = EventQuery.objects.filter(fqcn=fqcn, collection_version=collection_data['version']).first()
if event_query:
collection_data = yaml.safe_load(event_query.event_query)
net_job_data.update(collection_data)
return net_job_data


@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id):
job = Job.objects.get(id=job_id)
job_event_queries = fetch_job_event_query(job)
records = build_indirect_host_data(job, job_event_queries)
IndirectManagedNodeAudit.objects.bulk_create(records)
4 changes: 2 additions & 2 deletions awx/main/tasks/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from awx.main.tasks.signals import with_signal_handling, signal_callback
from awx.main.tasks.receptor import AWXReceptorJob
from awx.main.tasks.facts import start_fact_cache, finish_fact_cache
from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields, events_processed_hook
from awx.main.exceptions import AwxTaskError, PostRunError, ReceptorNodeNotFound
from awx.main.utils.ansible import read_ansible_config
from awx.main.utils.safe_yaml import safe_dump, sanitize_jinja
Expand All @@ -81,7 +82,6 @@
)
from awx.conf.license import get_license
from awx.main.utils.handlers import SpecialInventoryHandler
from awx.main.tasks.system import update_smart_memberships_for_inventory, update_inventory_computed_fields
from awx.main.utils.update_model import update_model
from rest_framework.exceptions import PermissionDenied
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -641,7 +641,7 @@ def run(self, pk, **kwargs):
# Field host_status_counts is used as a metric to check if event processing is finished
# we send notifications if it is, if not, callback receiver will send them
if (self.instance.host_status_counts is not None) or (not self.runner_callback.wrapup_event_dispatched):
self.instance.send_notification_templates('succeeded' if status == 'successful' else 'failed')
events_processed_hook(self.instance)

try:
self.final_run_hook(self.instance, status, private_data_dir)
Expand Down
12 changes: 12 additions & 0 deletions awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
# Django-CRUM
from crum import impersonate

# Django flags
from flags.state import flag_enabled

# Runner
import ansible_runner.cleanup
Expand Down Expand Up @@ -63,6 +65,7 @@

from awx.main.utils.reload import stop_local_services
from awx.main.tasks.helpers import is_run_threshold_reached
from awx.main.tasks.host_indirect import save_indirect_host_entries
from awx.main.tasks.receptor import get_receptor_ctl, worker_info, worker_cleanup, administrative_workunit_reaper, write_receptor_config
from awx.main.consumers import emit_channel_notification
from awx.main import analytics
Expand Down Expand Up @@ -365,6 +368,15 @@ def send_notifications(notification_list, job_id=None):
logger.exception('Error saving notification {} result.'.format(notification.id))


def events_processed_hook(unified_job):
"""This method is intended to be called for every unified job
after the playbook_on_stats/EOF event is processed and final status is saved
Either one of these events could happen before the other, or there may be no events"""
unified_job.send_notification_templates('succeeded' if unified_job.status == 'successful' else 'failed')
if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
save_indirect_host_entries.delay(unified_job.id)


@task(queue=get_task_queuename)
def gather_analytics():
if is_run_threshold_reached(getattr(settings, 'AUTOMATION_ANALYTICS_LAST_GATHER', None), settings.AUTOMATION_ANALYTICS_GATHER_INTERVAL):
Expand Down
24 changes: 21 additions & 3 deletions awx/main/tests/live/tests/test_indirect_host_counting.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import yaml
import time

from awx.main.tests.live.tests.conftest import wait_for_events

from awx.main.tasks.host_indirect import build_indirect_host_data
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models import Job


Expand All @@ -21,9 +22,12 @@ def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook):
assert len(results) == 1
host_audit_entry = results[0]

canonical_facts = {'host_name': 'foo_host_default'}
facts = {'device_type': 'Fake Host'}

# Asserts on data that will match to the input jq string from above
assert host_audit_entry.canonical_facts == {'host_name': 'foo_host_default'}
assert host_audit_entry.facts == {'device_type': 'Fake Host'}
assert host_audit_entry.canonical_facts == canonical_facts
assert host_audit_entry.facts == facts

# Test collection of data
assert 'demo.query' in job.installed_collections
Expand All @@ -33,3 +37,17 @@ def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook):
assert hq_data == {'demo.query.example': module_jq_str}

assert job.ansible_version

# This will poll, because it depends on the background task finishing
for _ in range(10):
if IndirectManagedNodeAudit.objects.filter(job=job).exists():
break
time.sleep(0.2)
else:
raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}')

assert IndirectManagedNodeAudit.objects.filter(job=job).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=job).first()
assert host_audit.canonical_facts == canonical_facts
assert host_audit.facts == facts
assert host_audit.organization == job.organization

0 comments on commit 4392587

Please sign in to comment.