Skip to content

Commit

Permalink
Merge branch 'main' into gio/add-task-group-to-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
giovanni-guidini authored Apr 22, 2024
2 parents c58156e + 735b82a commit a8ffcc0
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 60 deletions.
65 changes: 63 additions & 2 deletions helpers/parallel_upload_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
from database.models.reports import Upload


# copied from shared/reports/resources.py Report.next_session_number()
# Copied from shared/reports/resources.py Report.next_session_number()
def next_session_number(session_dict):
start_number = len(session_dict)
while start_number in session_dict or str(start_number) in session_dict:
start_number += 1
return start_number


# copied and cut down from worker/services/report/raw_upload_processor.py
# Copied and cut down from worker/services/report/raw_upload_processor.py
# this version stripped out all the ATS label stuff
def _adjust_sessions(
original_sessions: dict,
Expand Down Expand Up @@ -62,3 +62,64 @@ def get_parallel_session_ids(
get_parallel_session_ids.append(next_session_id)

return get_parallel_session_ids


def save_incremental_report_results(
report_service, commit, report, parallel_idx, report_code
):
commitid = commit.commitid
archive_service = report_service.get_archive_service(commit.repository)

# Save incremental results to archive storage,
# upload_finisher will combine
chunks = report.to_archive().encode()
_, files_and_sessions = report.to_database()

chunks_url = archive_service.write_parallel_experiment_file(
commitid, chunks, report_code, f"incremental/chunk{parallel_idx}"
)
files_and_sessions_url = archive_service.write_parallel_experiment_file(
commitid,
files_and_sessions,
report_code,
f"incremental/files_and_sessions{parallel_idx}",
)

parallel_incremental_result = {
"parallel_idx": parallel_idx,
"chunks_path": chunks_url,
"files_and_sessions_path": files_and_sessions_url,
}
return parallel_incremental_result


# Saves the result of the an entire serial processing flow to archive storage
# so that it can be compared for parallel experiment. Not necessarily the final report
# for the commit, if more uploads are still made.
def save_final_serial_report_results(
report_service, commit, report, report_code, arguments_list
):
commitid = commit.commitid
archive_service = report_service.get_archive_service(commit.repository)

# We identify the final result of an entire serial processing pipeline
# by the upload_pk of the very last upload received (ie the last element
# in arguments_list), and this is how each parallel verification task
# knows where to find the corresponding report to compare with for a given flow
latest_upload_pk = arguments_list[-1].get("upload_pk")

chunks = report.to_archive().encode()
_, files_and_sessions = report.to_database()

archive_service.write_parallel_experiment_file(
commitid,
chunks,
report_code,
f"serial/chunks<latest_upload_pk:{latest_upload_pk}>",
)
archive_service.write_parallel_experiment_file(
commitid,
files_and_sessions,
report_code,
f"serial/files_and_sessions<latest_upload_pk:{latest_upload_pk}>",
)
66 changes: 37 additions & 29 deletions tasks/parallel_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def run_impl(
commit_yaml,
report_code,
parallel_paths,
processing_results,
**kwargs,
):
commits = db_session.query(Commit).filter(
Expand Down Expand Up @@ -56,37 +57,34 @@ def run_impl(
parallel_paths["chunks_path"]
).decode(errors="replace")

# Retrieve serial results using legacy method
l_files = commit.report_json["files"]
l_sessions = commit.report_json["sessions"]
l_files_and_sessions = sort_and_stringify_report_json(
{"files": l_files, "sessions": l_sessions}
# TODO: ensure the legacy report building method (`commit.report_json["files"]`) is accurate aswell. There's
# no easy way to do this right now because the legacy method assumes the
# report to build lives in the database, but the report we want to compare
# for the verification experiment lives in archive storage.

# the pk of the last upload for the processing pipeline
last_upload_pk = processing_results["processings_so_far"][-1]["arguments"].get(
"upload_pk"
)

# Retrieve serial results
report = report_service.get_existing_report_for_commit(commit)
_, files_and_sessions = report.to_database()
files_and_sessions = sort_and_stringify_report_json(
json.loads(files_and_sessions)
json.loads(
archive_service.read_file(
parallel_path_to_serial_path(
parallel_paths["files_and_sessions_path"], last_upload_pk
)
)
)
)
chunks = archive_service.read_chunks(commitid, report_code)
chunks = archive_service.read_file(
parallel_path_to_serial_path(parallel_paths["chunks_path"], last_upload_pk)
).decode(errors="replace")

fas_legacy = parallel_files_and_sessions == l_files_and_sessions
fas_regular = parallel_files_and_sessions == files_and_sessions
chunks_regular = parallel_chunks == chunks
fas_comparison_result = parallel_files_and_sessions == files_and_sessions
chunks_comparison_result = parallel_chunks == chunks

if not fas_legacy:
log.info(
"Legacy files and sessions did not match parallel results",
extra=dict(
repoid=repoid,
commitid=commitid,
commit_yaml=commit_yaml,
report_code=report_code,
parallel_paths=parallel_paths,
),
)
if not fas_regular:
if not fas_comparison_result:
log.info(
"Files and sessions did not match parallel results",
extra=dict(
Expand All @@ -97,7 +95,7 @@ def run_impl(
parallel_paths=parallel_paths,
),
)
if not chunks_regular:
if not chunks_comparison_result:
log.info(
"chunks did not match parallel results",
extra=dict(
Expand All @@ -110,10 +108,8 @@ def run_impl(
)

verification_result = (
(1 if fas_legacy else 0)
+ (1 if fas_regular else 0)
+ (1 if chunks_regular else 0)
) / 3
(1 if fas_comparison_result else 0) + (1 if chunks_comparison_result else 0)
) / 2

if verification_result == 1:
log.info(
Expand Down Expand Up @@ -141,6 +137,18 @@ def run_impl(
return


def parallel_path_to_serial_path(parallel_path, last_upload_pk):
parallel_paths = parallel_path.split("/")
cur_file = parallel_paths.pop().removesuffix(
".txt"
) # either chunks.txt, <report_code>.txt, or files_and_sessions.txt
serial_path = (
"/".join(parallel_paths)
+ f"/serial/{cur_file}<latest_upload_pk:{str(last_upload_pk)}>.txt"
)
return serial_path


# To filter out values not relevant for verifying report content correctness. We
# don't want these values to be the reason why there exists a diff, since it's
# not actually related to coverage report contents
Expand Down
5 changes: 5 additions & 0 deletions tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ def test_upload_task_call(
],
report_code=None,
in_parallel=False,
is_final=True,
),
)
kwargs = dict(
Expand Down Expand Up @@ -567,6 +568,7 @@ def test_upload_task_call_multiple_processors(
],
report_code=None,
in_parallel=False,
is_final=False,
),
)
t2 = upload_processor_task.signature(
Expand All @@ -582,6 +584,7 @@ def test_upload_task_call_multiple_processors(
],
report_code=None,
in_parallel=False,
is_final=False,
),
)
t3 = upload_processor_task.signature(
Expand All @@ -596,6 +599,7 @@ def test_upload_task_call_multiple_processors(
],
report_code=None,
in_parallel=False,
is_final=False,
),
)
kwargs = dict(
Expand Down Expand Up @@ -1086,6 +1090,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker):
arguments_list=argument_list,
report_code=None,
in_parallel=False,
is_final=True,
),
)
t2 = upload_finisher_task.signature(
Expand Down
5 changes: 5 additions & 0 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uuid
from datetime import datetime, timedelta
from json import loads
from math import ceil
from typing import Any, List, Mapping, Optional

from asgiref.sync import async_to_sync
Expand Down Expand Up @@ -666,6 +667,9 @@ def _schedule_coverage_processing_task(
arguments_list=chunk,
report_code=commit_report.code,
in_parallel=False,
is_final=True
if i == ceil(len(argument_list) / chunk_size) - 1
else False,
),
)
processing_tasks.append(sig)
Expand Down Expand Up @@ -742,6 +746,7 @@ def _schedule_coverage_processing_task(
i
], # i + parallel_session_id,
in_parallel=True,
is_final=True if i == num_sessions - 1 else False,
),
)
parallel_processing_tasks.append(sig)
Expand Down
1 change: 1 addition & 0 deletions tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def run_impl(
commit_yaml=commit_yaml,
report_code=report_code,
parallel_paths=parallel_paths,
processing_results=processing_results,
),
)
task.apply_async()
Expand Down
46 changes: 17 additions & 29 deletions tasks/upload_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from database.enums import CommitErrorTypes
from database.models import Commit, Upload
from helpers.metrics import metrics
from helpers.parallel_upload_processing import (
save_final_serial_report_results,
save_incremental_report_results,
)
from helpers.save_commit_error import save_commit_error
from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO
from services.bots import RepositoryWithoutValidBotError
Expand Down Expand Up @@ -83,6 +87,7 @@ def run_impl(
report_code=None,
parallel_idx=None,
in_parallel=False,
is_final=False,
**kwargs,
):
repoid = int(repoid)
Expand Down Expand Up @@ -151,6 +156,7 @@ def run_impl(
parallel_idx=parallel_idx,
parent_task=self.request.parent_id,
in_parallel=in_parallel,
is_final=is_final,
**kwargs,
)
except LockError:
Expand Down Expand Up @@ -180,6 +186,7 @@ def process_impl_within_lock(
report_code,
parallel_idx=None,
in_parallel=False,
is_final=False,
**kwargs,
):
if (
Expand Down Expand Up @@ -315,7 +322,7 @@ def process_impl_within_lock(
with metrics.timer(
f"{self.metrics_prefix}.save_incremental_report_results"
):
parallel_incremental_result = self.save_incremental_report_results(
parallel_incremental_result = save_incremental_report_results(
report_service, commit, report, parallel_idx, report_code
)
parallel_incremental_result["upload_pk"] = arguments_list[0].get(
Expand All @@ -342,6 +349,15 @@ def process_impl_within_lock(
report_code,
)

# Save the final accumulated result from the serial flow for the
# ParallelVerification task to compare with later, for the parallel
# experiment. The report being saved is not necessarily the final
# report for the commit, as more uploads can still be made.
if is_final and (not in_parallel):
save_final_serial_report_results(
report_service, commit, report, report_code, arguments_list
)

for processed_individual_report in processings_so_far:
# We delete and rewrite the artifacts when the serial flow runs first. When
# the parallel flow runs second, it parses the human readable artifacts instead
Expand Down Expand Up @@ -593,34 +609,6 @@ def save_report_results(
db_session.commit()
return res

def save_incremental_report_results(
self, report_service, commit, report, parallel_idx, report_code
):
commitid = commit.commitid
archive_service = report_service.get_archive_service(commit.repository)

# save incremental results to archive storage,
# upload_finisher will combine
chunks = report.to_archive().encode()
_, files_and_sessions = report.to_database()

chunks_url = archive_service.write_parallel_experiment_file(
commitid, chunks, report_code, f"incremental/chunk{parallel_idx}"
)
files_and_sessions_url = archive_service.write_parallel_experiment_file(
commitid,
files_and_sessions,
report_code,
f"incremental/files_and_sessions{parallel_idx}",
)

parallel_incremental_result = {
"parallel_idx": parallel_idx,
"chunks_path": chunks_url,
"files_and_sessions_path": files_and_sessions_url,
}
return parallel_incremental_result


RegisteredUploadTask = celery_app.register_task(UploadProcessorTask())
upload_processor_task = celery_app.tasks[RegisteredUploadTask.name]

0 comments on commit a8ffcc0

Please sign in to comment.