Skip to content

Commit

Permalink
Remove polling loop for job finishing event processing
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanCoding committed Feb 4, 2025
1 parent b2d887b commit 2ff6cb7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 14 deletions.
13 changes: 4 additions & 9 deletions awx/main/tasks/host_indirect.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,16 +137,11 @@ def save_indirect_host_entries(job_id: int, wait_for_events: bool = True) -> Non

if wait_for_events:
# Gate running this task on the job having all events processed, not just EOF or playbook_on_stats
current_events = 0
for _ in range(10):
current_events = job.job_events.count()
if current_events >= job.emitted_events:
break
logger.debug(f'Waiting for job_id={job_id} to finish processing events, currently {current_events} < {job.emitted_events}')
time.sleep(0.2)
else:
logger.warning(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking')
current_events = job.job_events.count()
if current_events < job.emitted_events:
logger.info(f'Event count {current_events} < {job.emitted_events} for job_id={job_id}, delaying processing of indirect host tracking')
return
job.log_lifecycle(f'finished processing {current_events} events, running save_indirect_host_entries')

with transaction.atomic():
"""
Expand Down
2 changes: 1 addition & 1 deletion awx/main/tasks/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def events_processed_hook(unified_job):
after the playbook_on_stats/EOF event is processed and final status is saved
Either one of these events could happen before the other, or there may be no events"""
unified_job.send_notification_templates('succeeded' if unified_job.status == 'successful' else 'failed')
if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED"):
if isinstance(unified_job, Job) and flag_enabled("FEATURE_INDIRECT_NODE_COUNTING_ENABLED") and (unified_job.event_queries_processed is False):
save_indirect_host_entries.delay(unified_job.id)


Expand Down
20 changes: 16 additions & 4 deletions awx/main/tests/live/tests/test_indirect_host_counting.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import time

from awx.main.tests.live.tests.conftest import wait_for_events
from awx.main.tasks.host_indirect import build_indirect_host_data
from awx.main.tasks.host_indirect import build_indirect_host_data, save_indirect_host_entries
from awx.main.models.indirect_managed_node_audit import IndirectManagedNodeAudit
from awx.main.models import Job

Expand Down Expand Up @@ -38,13 +38,25 @@ def test_indirect_host_counting(live_tmp_folder, run_job_from_playbook):

assert job.ansible_version

# This will poll, because it depends on the background task finishing
# Poll for events finishing processing, because background task requires this
for _ in range(10):
if IndirectManagedNodeAudit.objects.filter(job=job).exists():
if job.job_events.count() >= job.emitted_events:
break
time.sleep(0.2)
else:
raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}')
raise RuntimeError(f'job id={job.id} never processed events')

# Task might not run due to race condition, so make it run here
job.refresh_from_db()
if job.event_queries_processed is False:
save_indirect_host_entries.delay(job.id, wait_for_events=False)
# This will poll for the background task to finish
for _ in range(10):
if IndirectManagedNodeAudit.objects.filter(job=job).exists():
break
time.sleep(0.2)
else:
raise RuntimeError(f'No IndirectManagedNodeAudit records ever populated for job_id={job.id}')

assert IndirectManagedNodeAudit.objects.filter(job=job).count() == 1
host_audit = IndirectManagedNodeAudit.objects.filter(job=job).first()
Expand Down

0 comments on commit 2ff6cb7

Please sign in to comment.