Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT][DO NOT MERGE]: Feature indirect host counting #15791

Closed
wants to merge 8 commits into from
3 changes: 2 additions & 1 deletion awx/main/dispatch/worker/callback.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from awx.main.constants import ACTIVE_STATES
from awx.main.models.events import emit_event_detail
from awx.main.utils.profiling import AWXProfiler
from awx.main.tasks.system import events_processed_hook
import awx.main.analytics.subsystem_metrics as s_metrics
from .base import BaseWorker

Expand All @@ -46,7 +47,7 @@
# If the status was a finished state before this update was made, send notifications
# If not, we will send notifications when the status changes
if uj.status not in ACTIVE_STATES:
uj.send_notification_templates('succeeded' if uj.status == 'successful' else 'failed')
events_processed_hook(uj)

Check warning on line 50 in awx/main/dispatch/worker/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/dispatch/worker/callback.py#L50

Added line #L50 was not covered by tests

except Exception:
logger.exception('Worker failed to save stats or emit notifications: Job {}'.format(job_identifier))
Expand Down
25 changes: 25 additions & 0 deletions awx/main/migrations/0201_eventquery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Generated by Django 4.2.16 on 2025-01-27 12:19

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0200_delete_token_cleanup_job'),
]

operations = [
migrations.CreateModel(
name='EventQuery',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('fqcn', models.CharField(max_length=255)),
('collection_version', models.CharField(max_length=32)),
('event_query', models.JSONField(default=dict)),
],
options={
'unique_together': {('fqcn', 'collection_version')},
},
),
]
47 changes: 47 additions & 0 deletions awx/main/migrations/0202_indirectmanagednodeaudit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Generated by Django 4.2.16 on 2025-01-29 12:23

from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('main', '0201_eventquery'),
]

operations = [
migrations.CreateModel(
name='IndirectManagedNodeAudit',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created', models.DateTimeField(auto_now_add=True)),
('name', models.CharField(max_length=255)),
('canonical_facts', models.JSONField(default=dict)),
('facts', models.JSONField(default=dict)),
('events', models.JSONField(default=list)),
('count', models.PositiveIntegerField(default=0)),
(
'host',
models.ForeignKey(null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='host_indirect_host_audits', to='main.host'),
),
(
'inventory',
models.ForeignKey(
null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='inventory_indirect_host_audits', to='main.inventory'
),
),
(
'job',
models.ForeignKey(editable=False, on_delete=django.db.models.deletion.DO_NOTHING, related_name='job_indirect_host_audits', to='main.job'),
),
(
'organization',
models.ForeignKey(on_delete=django.db.models.deletion.DO_NOTHING, related_name='organization_indirect_host_audits', to='main.organization'),
),
],
options={
'unique_together': {('name', 'job')},
},
),
]
18 changes: 18 additions & 0 deletions awx/main/migrations/0203_job_event_queries_processed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Generated by Django 4.2.16 on 2025-01-29 12:26

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
('main', '0202_indirectmanagednodeaudit'),
]

operations = [
migrations.AddField(
model_name='job',
name='event_queries_processed',
field=models.BooleanField(default=False, help_text='Events of this job have been queried for indirect host information'),
),
]
27 changes: 27 additions & 0 deletions awx/main/models/event_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from django.core.exceptions import ValidationError
from django.db import models

from awx.main.models import BaseModel


class EventQuery(BaseModel):
"""
Event queries are jq present in some collections and used to filter job events
for indirectly created resources.
"""

class Meta:
app_label = 'main'
unique_together = ['fqcn', 'collection_version']

fqcn = models.CharField(max_length=255)
collection_version = models.CharField(max_length=32)
event_query = models.JSONField(default=dict)

def validate_unique(self, exclude=None):
try:
EventQuery.objects.get(fqcn=self.fqcn, collection_version=self.collection_version)
except EventQuery.DoesNotExist:
return

Check warning on line 25 in awx/main/models/event_query.py

View check run for this annotation

Codecov / codecov/patch

awx/main/models/event_query.py#L22-L25

Added lines #L22 - L25 were not covered by tests

raise ValidationError(f'an event query for collection {self.fqcn}, version {self.collection_version} already exists')

Check warning on line 27 in awx/main/models/event_query.py

View check run for this annotation

Codecov / codecov/patch

awx/main/models/event_query.py#L27

Added line #L27 was not covered by tests
54 changes: 54 additions & 0 deletions awx/main/models/indirect_managed_node_audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from django.db.models.deletion import DO_NOTHING
from django.db.models.fields import DateTimeField, CharField, PositiveIntegerField
from django.db.models.fields.json import JSONField
from django.db.models.fields.related import ForeignKey
from awx.main.models import BaseModel


class IndirectManagedNodeAudit(BaseModel):
"""
IndirectManagedNodeAudit stores information about indirectly created or managed hosts
"""

class Meta:
app_label = 'main'
unique_together = [('name', 'job')]

created = DateTimeField(auto_now_add=True)

job = ForeignKey(
'Job',
related_name='job_indirect_host_audits',
on_delete=DO_NOTHING,
editable=False,
)

organization = ForeignKey(
'Organization',
related_name='organization_indirect_host_audits',
on_delete=DO_NOTHING,
)

inventory = ForeignKey(
'Inventory',
related_name='inventory_indirect_host_audits',
null=True,
on_delete=DO_NOTHING,
)

host = ForeignKey(
'Host',
related_name='host_indirect_host_audits',
null=True,
on_delete=DO_NOTHING,
)

name = CharField(max_length=255)

canonical_facts = JSONField(default=dict)

facts = JSONField(default=dict)

events = JSONField(default=list)

count = PositiveIntegerField(default=0)
4 changes: 4 additions & 0 deletions awx/main/models/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,10 @@ class Meta:
default=1,
help_text=_("If ran as part of sliced jobs, the total number of slices. If 1, job is not part of a sliced job."),
)
event_queries_processed = models.BooleanField(
default=False,
help_text=_("Events of this job have been queried for indirect host information"),
)

def _get_parent_field_name(self):
return 'job_template'
Expand Down
85 changes: 85 additions & 0 deletions awx/main/tasks/callback.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json
import os.path
import time
import logging
from collections import deque

from awx.main.models.event_query import EventQuery

# Django
from django.conf import settings
from django.core.exceptions import ValidationError
from django_guid import get_guid
from django.utils.functional import cached_property
from django.db import connections
Expand All @@ -15,11 +19,67 @@
from awx.main.utils.update_model import update_model
from awx.main.queue import CallbackQueueDispatcher

from flags.state import flag_enabled

logger = logging.getLogger('awx.main.tasks.callback')


def collect_queries(query_file_contents) -> dict:
"""
collect_queries extracts host queries from the contents of
ansible_data.json
"""
result = {}

Check warning on line 32 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L32

Added line #L32 was not covered by tests

try:
installed_collections = query_file_contents['installed_collections']
except KeyError:
logger.error("installed_collections missing in callback response")
return result

Check warning on line 38 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L34-L38

Added lines #L34 - L38 were not covered by tests

for key, value in installed_collections.items():
if 'host_query' in value and 'version' in value:
result[key] = value

Check warning on line 42 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L42

Added line #L42 was not covered by tests

return result

Check warning on line 44 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L44

Added line #L44 was not covered by tests


COLLECTION_FILENAME = "ansible_data.json"


def try_load_query_file(artifact_dir) -> (bool, dict):
"""
try_load_query_file checks the artifact directory after job completion and
returns the contents of ansible_data.json if present
"""

if not flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
return False, None

Check warning on line 57 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L57

Added line #L57 was not covered by tests

queries_path = os.path.join(artifact_dir, COLLECTION_FILENAME)

Check warning on line 59 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L59

Added line #L59 was not covered by tests
if not os.path.isfile(queries_path):
logger.info(f"no query file found: {queries_path}")
return False, None

Check warning on line 62 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L61-L62

Added lines #L61 - L62 were not covered by tests

try:
f = open(queries_path, "r")
except OSError as e:
logger.error(f"error opening query file {queries_path}: {e}")
return False, None

Check warning on line 68 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L64-L68

Added lines #L64 - L68 were not covered by tests

with f:
try:
queries = json.load(f)
except ValueError as e:
logger.error(f"error parsing query file {queries_path}: {e}")
return False, None

Check warning on line 75 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L70-L75

Added lines #L70 - L75 were not covered by tests

return True, queries

Check warning on line 77 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L77

Added line #L77 was not covered by tests


class RunnerCallback:
def __init__(self, model=None):
self.instance = None

Check warning on line 82 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L82

Added line #L82 was not covered by tests
self.parent_workflow_job_id = None
self.host_map = {}
self.guid = get_guid()
Expand Down Expand Up @@ -214,6 +274,31 @@
self.delay_update(**{field_name: field_value})

def artifacts_handler(self, artifact_dir):
success, query_file_contents = try_load_query_file(artifact_dir)

Check warning on line 277 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L277

Added line #L277 was not covered by tests
if success:
collections_info = collect_queries(query_file_contents)

Check warning on line 279 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L279

Added line #L279 was not covered by tests
for collection, data in collections_info.items():
version = data['version']
event_query = data['host_query']
instance = EventQuery(fqcn=collection, collection_version=version, event_query=event_query)
try:
instance.validate_unique()
instance.save()

Check warning on line 286 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L281-L286

Added lines #L281 - L286 were not covered by tests

logger.info(f"eventy query for collection {collection}, version {version} created")
except ValidationError as e:
logger.info(e)

Check warning on line 290 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L288-L290

Added lines #L288 - L290 were not covered by tests

if 'installed_collections' in query_file_contents:
self.delay_update(installed_collections=query_file_contents['installed_collections'])

Check warning on line 293 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L293

Added line #L293 was not covered by tests
else:
logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain installed_collections')

Check warning on line 295 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L295

Added line #L295 was not covered by tests

if 'ansible_version' in query_file_contents:
self.delay_update(ansible_version=query_file_contents['ansible_version'])

Check warning on line 298 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L298

Added line #L298 was not covered by tests
else:
logger.warning(f'The file {COLLECTION_FILENAME} unexpectedly did not contain ansible_version')

Check warning on line 300 in awx/main/tasks/callback.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/callback.py#L300

Added line #L300 was not covered by tests

self.artifacts_processed = True


Expand Down
61 changes: 61 additions & 0 deletions awx/main/tasks/host_indirect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import logging

import yaml

import jq

from awx.main.dispatch.publish import task
from awx.main.dispatch import get_task_queuename
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models.event_query import EventQuery
from awx.main.models import Job

logger = logging.getLogger(__name__)


def build_indirect_host_data(job, job_event_queries: dict[str, str]) -> list[IndirectManagedNodeAudit]:
results = []
compiled_jq_expressions = {} # Cache for compiled jq expressions
facts_missing_logged = False

Check warning on line 19 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L17-L19

Added lines #L17 - L19 were not covered by tests
for event in job.job_events.filter(task__in=job_event_queries.keys()).iterator():
if 'res' not in event.event_data:
continue
jq_str_for_event = job_event_queries[event.task]

Check warning on line 23 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L22-L23

Added lines #L22 - L23 were not covered by tests
if jq_str_for_event not in compiled_jq_expressions:
compiled_jq_expressions[event.task] = jq.compile(jq_str_for_event)
compiled_jq = compiled_jq_expressions[event.task]

Check warning on line 26 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L25-L26

Added lines #L25 - L26 were not covered by tests
for data in compiled_jq.input(event.event_data['res']).all():
if not data.get('canonical_facts'):
if not facts_missing_logged:
logger.error(f'jq output missing canonical_facts for module {event.task} on event {event.id} using jq:{jq_str_for_event}')
continue
canonical_facts = data['canonical_facts']
facts = data.get('facts')
results.append(IndirectManagedNodeAudit(canonical_facts=canonical_facts, facts=facts, job=job, organization=job.organization))
return results

Check warning on line 35 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L30-L35

Added lines #L30 - L35 were not covered by tests


def fetch_job_event_query(job) -> dict[str, str]:
"""Returns the following data structure
{
"demo.query.example": "{canonical_facts: {host_name: .direct_host_name}}"
}
The keys are fully-qualified Ansible module names, and the values are strings which are jq expressions.

This contains all event query expressions that pertain to the given job
"""
net_job_data = {}

Check warning on line 47 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L47

Added line #L47 was not covered by tests
for fqcn, collection_data in job.installed_collections.items():
event_query = EventQuery.objects.filter(fqcn=fqcn, collection_version=collection_data['version']).first()

Check warning on line 49 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L49

Added line #L49 was not covered by tests
if event_query:
collection_data = yaml.safe_load(event_query.event_query)
net_job_data.update(collection_data)
return net_job_data

Check warning on line 53 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L51-L53

Added lines #L51 - L53 were not covered by tests


@task(queue=get_task_queuename)
def save_indirect_host_entries(job_id):
job = Job.objects.get(id=job_id)
job_event_queries = fetch_job_event_query(job)
records = build_indirect_host_data(job, job_event_queries)
IndirectManagedNodeAudit.objects.bulk_create(records)

Check warning on line 61 in awx/main/tasks/host_indirect.py

View check run for this annotation

Codecov / codecov/patch

awx/main/tasks/host_indirect.py#L58-L61

Added lines #L58 - L61 were not covered by tests
Loading
Loading