Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent job refactor #371

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 39 additions & 202 deletions agent/testflinger_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,85 +12,22 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>

import json
import logging
import os
from pathlib import Path
import shutil
import signal
import sys
import tempfile

from testflinger_agent.job import TestflingerJob
from testflinger_agent.errors import TFServerError
from testflinger_agent.config import ATTACHMENTS_DIR
from testflinger_agent.event_emitter import EventEmitter
from testflinger_common.enums import JobState, TestPhase, TestEvent


try:
# attempt importing a tarfile filter, to check if filtering is supported
from tarfile import data_filter

del data_filter
except ImportError:
# import a patched version of `tarfile` that supports filtering;
# this conditional import can be removed when all agents run
# versions of Python that support filtering, i.e. at least:
# 3.8.17, 3.9.17, 3.10.12, 3.11.4, 3.12
from . import tarfile_patch as tarfile
else:
import tarfile


logger = logging.getLogger(__name__)


def secure_filter(member, path):
"""Combine the `data` filter with custom attachment filtering

Makes sure that the starting folder for all attachments coincides
with one of the supported phases, i.e. that the attachment archive
has been created properly and no attachment will be extracted to an
unexpected location.
"""
try:
resolved = Path(member.name).resolve().relative_to(Path.cwd())
except ValueError as error:
# essentially trying to extract higher than the attachments folder
raise tarfile.OutsideDestinationError(member, path) from error
if not str(resolved).startswith(
("provision/", "firmware_update/", "test/")
):
# trying to extract in an invalid folder, under the attachments folder
raise tarfile.OutsideDestinationError(member, path)
return tarfile.data_filter(member, path)


def parse_error_logs(error_log_path: str, phase: str):
with open(error_log_path, "r") as error_file:
error_file_contents = error_file.read()
try:
exception_info = json.loads(error_file_contents)[
f"{phase}_exception_info"
]
if exception_info["exception_cause"] is None:
detail = "%s: %s" % (
exception_info["exception_name"],
exception_info["exception_message"],
)
else:
detail = "%s: %s caused by %s" % (
exception_info["exception_name"],
exception_info["exception_message"],
exception_info["exception_cause"],
)
return detail
except (json.JSONDecodeError, KeyError):
return ""


class TestflingerAgent:

def __init__(self, client):
self.client = client
signal.signal(signal.SIGUSR1, self.restart_signal_handler)
Expand Down Expand Up @@ -179,168 +116,68 @@ def mark_device_offline(self):
# Create the offline file, this should work even if it exists
open(self.get_offline_files()[0], "w").close()

def unpack_attachments(self, job_data: dict, cwd: Path):
"""Download and unpack the attachments associated with a job"""
job_id = job_data["job_id"]

with tempfile.NamedTemporaryFile(suffix="tar.gz") as archive_tmp:
archive_path = Path(archive_tmp.name)
# download attachment archive
logger.info(f"Downloading attachments for {job_id}")
self.client.get_attachments(job_id, path=archive_path)
# extract archive into the attachments folder
logger.info(f"Unpacking attachments for {job_id}")
with tarfile.open(archive_path, "r:gz") as tar:
tar.extractall(cwd / ATTACHMENTS_DIR, filter=secure_filter)

# side effect: remove all attachment data from `job_data`
# (so there is no interference with existing processes, especially
# provisioning or firmware update, which are triggered when these
# sections are not empty)
for phase in (
TestPhase.PROVISION,
TestPhase.FIRMWARE_UPDATE,
TestPhase.TEST,
):
phase_str = f"{phase}_data"
try:
phase_data = job_data[phase_str]
except KeyError:
pass
else:
# delete attachments, if they exist
phase_data.pop("attachments", None)
# it may be the case that attachments were the only data
# included for this phase, so the phase can now be removed
if not phase_data:
del job_data[phase_str]

def process_jobs(self):
"""Coordinate checking for new jobs and handling them if they exists"""

TEST_PHASES = [
TestPhase.SETUP,
TestPhase.PROVISION,
TestPhase.FIRMWARE_UPDATE,
TestPhase.TEST,
TestPhase.ALLOCATE,
TestPhase.RESERVE,
]

# First, see if we have any old results that we couldn't send last time
self.retry_old_results()

self.check_restart()
job_data = self.client.check_jobs()
while job_data:
job_id = job_data["job_id"]
try:
# Create the job
job = TestflingerJob(job_data, self.client)
event_emitter = EventEmitter(
job_data.get("job_queue"),
job_data.get("job_status_webhook"),
self.client,
job.job_id,
)
job_end_reason = TestEvent.NORMAL_EXIT

logger.info("Starting job %s", job.job_id)
event_emitter.emit_event(
TestEvent.JOB_START,
f"{self.client.server}/jobs/{job.job_id}",
)
rundir = os.path.join(
self.client.config.get("execution_basedir"), job.job_id
)
os.makedirs(rundir)
boukeas marked this conversation as resolved.
Show resolved Hide resolved

self.client.post_agent_data({"job_id": job.job_id})

# Dump the job data to testflinger.json in our execution dir
with open(os.path.join(rundir, "testflinger.json"), "w") as f:
json.dump(job_data, f)
# Create json outcome file where phases will store their output
with open(
os.path.join(rundir, "testflinger-outcome.json"), "w"
) as f:
json.dump({}, f)

# Handle job attachments, if any.
#
# *Always* place this after creating "testflinger.json":
# - If there is an unpacking error, the file is required
# for reporting
# - The `unpack_attachments` method has a side effect on
# `job_data`: it removes attachment data. However, the
# file will still contain all the data received and
# pass it on to the device container
if job_data.get("attachments_status") == "complete":
self.unpack_attachments(job_data, cwd=Path(rundir))

error_log_path = os.path.join(
rundir, "device-connector-error.json"
)
# Clear error log before starting
open(error_log_path, "w").close()

for phase in TEST_PHASES:

# Let the server know the agent has picked up the job
self.client.post_agent_data({"job_id": job_id})
job.start()

# Go through the job phases
for phase in job.phase_sequence:

# First make sure the job hasn't been cancelled
if (
self.client.check_job_state(job.job_id)
== JobState.CANCELLED
):
logger.info("Job cancellation was requested, exiting.")
event_emitter.emit_event(TestEvent.CANCELLED)
if job.check_cancel():
job.cancel()
break

self.client.post_job_state(job.job_id, phase)
self.set_agent_state(phase)
event_emitter.emit_event(TestEvent(phase + "_start"))
exit_code, exit_event, exit_reason = job.run_test_phase(
phase, rundir
)
self.client.post_influx(phase, exit_code)
event_emitter.emit_event(exit_event, exit_reason)
detail = ""
if exit_code:
# exit code 46 is our indication that recovery failed!
# In this case, we need to mark the device offline
if exit_code == 46:
self.mark_device_offline()
exit_event = TestEvent.RECOVERY_FAIL
else:
exit_event = TestEvent(phase + "_fail")
detail = parse_error_logs(error_log_path, phase)
else:
exit_event = TestEvent(phase + "_success")
event_emitter.emit_event(exit_event, detail)
if phase == TestPhase.PROVISION:
self.client.post_provision_log(
job.job_id, exit_code, exit_event
)
if exit_code and phase != TestPhase.TEST:
logger.debug("Phase %s failed, aborting job" % phase)
job_end_reason = exit_event
break
except Exception as e:
logger.exception(e)
# Run the phase or skip it
if job.go(phase):
self.set_agent_state(phase)
job.run(phase)
if job.check_end():
break
except Exception as error:
logger.exception(f"{type(error).__name__}: {error}")
finally:
phase = TestPhase.CLEANUP
# Always run the cleanup, even if the job was cancelled
event_emitter.emit_event(TestEvent.CLEANUP_START)
job.run_test_phase(TestPhase.CLEANUP, rundir)
event_emitter.emit_event(TestEvent.CLEANUP_SUCCESS)
event_emitter.emit_event(TestEvent.JOB_END, job_end_reason)
# clear job id
self.client.post_agent_data({"job_id": ""})
if job.go(phase):
self.set_agent_state(phase)
job.run(phase)

# let the server know the agent is available (clear job id)
job.end()
self.client.post_agent_data({"job_id": ""})

provision_result = job.phases[TestPhase.PROVISION].result
if (
provision_result is not None
and provision_result.event == TestEvent.RECOVERY_FAIL
):
self.mark_device_offline()

try:
self.client.transmit_job_outcome(rundir)
self.client.transmit_job_outcome(str(job.params.rundir))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I forgot that I had the edge revision of serve from Varun's branch deployed when I was testing this today, and predictably hit a 422 error when it tried to transmit the outcome of a job containing attachments. But then I realized the job was never marked complete. This is because that information is conveyed through transmit_job_outcome. The result is that the CLI is just left in a "stuck" state waiting for the job to complete.

Of course, this isn't really from your patch, but it gave me an "aha" moment for this pathological case that could absolutely occur. I wonder if instead of just failing here, if we could go ahead an try to at least do a client.post_job_state() with the state that we intend to transmit (cancelled or complete) if we fall into the exception block here? That would help with things like this in the future.

except Exception as e:
# TFServerError will happen if we get other-than-good status
# Other errors can happen too for things like connection
# problems
logger.exception(e)
results_basedir = self.client.config.get("results_basedir")
shutil.move(rundir, results_basedir)
# [NOTE] In Python 3.8 `shutil.move`` still expects strings
shutil.move(str(job.params.rundir), results_basedir)
self.set_agent_state(JobState.WAITING)

self.check_restart()
Expand Down
Loading