From 735b82a703240741fa58c36256c9bc380e098f72 Mon Sep 17 00:00:00 2001 From: daniel-codecov <159859649+daniel-codecov@users.noreply.github.com> Date: Fri, 19 Apr 2024 13:18:55 -0400 Subject: [PATCH] Save serial reports explicitly for parallel experiment (#390) * save serial reports to parallel folder * tweak comments for clarity * fix tests * empty commit * remove junit tests --- helpers/parallel_upload_processing.py | 65 +++++++++++++++++++++++++- tasks/parallel_verification.py | 66 +++++++++++++++------------ tasks/tests/unit/test_upload_task.py | 5 ++ tasks/upload.py | 5 ++ tasks/upload_finisher.py | 1 + tasks/upload_processor.py | 46 +++++++------------ 6 files changed, 128 insertions(+), 60 deletions(-) diff --git a/helpers/parallel_upload_processing.py b/helpers/parallel_upload_processing.py index 23a3646fc..355b43e70 100644 --- a/helpers/parallel_upload_processing.py +++ b/helpers/parallel_upload_processing.py @@ -5,7 +5,7 @@ from database.models.reports import Upload -# copied from shared/reports/resources.py Report.next_session_number() +# Copied from shared/reports/resources.py Report.next_session_number() def next_session_number(session_dict): start_number = len(session_dict) while start_number in session_dict or str(start_number) in session_dict: @@ -13,7 +13,7 @@ def next_session_number(session_dict): return start_number -# copied and cut down from worker/services/report/raw_upload_processor.py +# Copied and cut down from worker/services/report/raw_upload_processor.py # this version stripped out all the ATS label stuff def _adjust_sessions( original_sessions: dict, @@ -62,3 +62,64 @@ def get_parallel_session_ids( get_parallel_session_ids.append(next_session_id) return get_parallel_session_ids + + +def save_incremental_report_results( + report_service, commit, report, parallel_idx, report_code +): + commitid = commit.commitid + archive_service = report_service.get_archive_service(commit.repository) + + # Save incremental results to archive storage, + # upload_finisher will combine + chunks = report.to_archive().encode() + _, files_and_sessions = report.to_database() + + chunks_url = archive_service.write_parallel_experiment_file( + commitid, chunks, report_code, f"incremental/chunk{parallel_idx}" + ) + files_and_sessions_url = archive_service.write_parallel_experiment_file( + commitid, + files_and_sessions, + report_code, + f"incremental/files_and_sessions{parallel_idx}", + ) + + parallel_incremental_result = { + "parallel_idx": parallel_idx, + "chunks_path": chunks_url, + "files_and_sessions_path": files_and_sessions_url, + } + return parallel_incremental_result + + +# Saves the result of the an entire serial processing flow to archive storage +# so that it can be compared for parallel experiment. Not necessarily the final report +# for the commit, if more uploads are still made. +def save_final_serial_report_results( + report_service, commit, report, report_code, arguments_list +): + commitid = commit.commitid + archive_service = report_service.get_archive_service(commit.repository) + + # We identify the final result of an entire serial processing pipeline + # by the upload_pk of the very last upload received (ie the last element + # in arguments_list), and this is how each parallel verification task + # knows where to find the corresponding report to compare with for a given flow + latest_upload_pk = arguments_list[-1].get("upload_pk") + + chunks = report.to_archive().encode() + _, files_and_sessions = report.to_database() + + archive_service.write_parallel_experiment_file( + commitid, + chunks, + report_code, + f"serial/chunks", + ) + archive_service.write_parallel_experiment_file( + commitid, + files_and_sessions, + report_code, + f"serial/files_and_sessions", + ) diff --git a/tasks/parallel_verification.py b/tasks/parallel_verification.py index 6d8ce1d23..ad8303fb4 100644 --- a/tasks/parallel_verification.py +++ b/tasks/parallel_verification.py @@ -22,6 +22,7 @@ def run_impl( commit_yaml, report_code, parallel_paths, + processing_results, **kwargs, ): commits = db_session.query(Commit).filter( @@ -56,37 +57,34 @@ def run_impl( parallel_paths["chunks_path"] ).decode(errors="replace") - # Retrieve serial results using legacy method - l_files = commit.report_json["files"] - l_sessions = commit.report_json["sessions"] - l_files_and_sessions = sort_and_stringify_report_json( - {"files": l_files, "sessions": l_sessions} + # TODO: ensure the legacy report building method (`commit.report_json["files"]`) is accurate aswell. There's + # no easy way to do this right now because the legacy method assumes the + # report to build lives in the database, but the report we want to compare + # for the verification experiment lives in archive storage. + + # the pk of the last upload for the processing pipeline + last_upload_pk = processing_results["processings_so_far"][-1]["arguments"].get( + "upload_pk" ) # Retrieve serial results - report = report_service.get_existing_report_for_commit(commit) - _, files_and_sessions = report.to_database() files_and_sessions = sort_and_stringify_report_json( - json.loads(files_and_sessions) + json.loads( + archive_service.read_file( + parallel_path_to_serial_path( + parallel_paths["files_and_sessions_path"], last_upload_pk + ) + ) + ) ) - chunks = archive_service.read_chunks(commitid, report_code) + chunks = archive_service.read_file( + parallel_path_to_serial_path(parallel_paths["chunks_path"], last_upload_pk) + ).decode(errors="replace") - fas_legacy = parallel_files_and_sessions == l_files_and_sessions - fas_regular = parallel_files_and_sessions == files_and_sessions - chunks_regular = parallel_chunks == chunks + fas_comparison_result = parallel_files_and_sessions == files_and_sessions + chunks_comparison_result = parallel_chunks == chunks - if not fas_legacy: - log.info( - "Legacy files and sessions did not match parallel results", - extra=dict( - repoid=repoid, - commitid=commitid, - commit_yaml=commit_yaml, - report_code=report_code, - parallel_paths=parallel_paths, - ), - ) - if not fas_regular: + if not fas_comparison_result: log.info( "Files and sessions did not match parallel results", extra=dict( @@ -97,7 +95,7 @@ def run_impl( parallel_paths=parallel_paths, ), ) - if not chunks_regular: + if not chunks_comparison_result: log.info( "chunks did not match parallel results", extra=dict( @@ -110,10 +108,8 @@ def run_impl( ) verification_result = ( - (1 if fas_legacy else 0) - + (1 if fas_regular else 0) - + (1 if chunks_regular else 0) - ) / 3 + (1 if fas_comparison_result else 0) + (1 if chunks_comparison_result else 0) + ) / 2 if verification_result == 1: log.info( @@ -141,6 +137,18 @@ def run_impl( return +def parallel_path_to_serial_path(parallel_path, last_upload_pk): + parallel_paths = parallel_path.split("/") + cur_file = parallel_paths.pop().removesuffix( + ".txt" + ) # either chunks.txt, .txt, or files_and_sessions.txt + serial_path = ( + "/".join(parallel_paths) + + f"/serial/{cur_file}.txt" + ) + return serial_path + + # To filter out values not relevant for verifying report content correctness. We # don't want these values to be the reason why there exists a diff, since it's # not actually related to coverage report contents diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index fed9dbdcd..86cd6a0ab 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -173,6 +173,7 @@ def test_upload_task_call( ], report_code=None, in_parallel=False, + is_final=True, ), ) kwargs = dict( @@ -567,6 +568,7 @@ def test_upload_task_call_multiple_processors( ], report_code=None, in_parallel=False, + is_final=False, ), ) t2 = upload_processor_task.signature( @@ -582,6 +584,7 @@ def test_upload_task_call_multiple_processors( ], report_code=None, in_parallel=False, + is_final=False, ), ) t3 = upload_processor_task.signature( @@ -596,6 +599,7 @@ def test_upload_task_call_multiple_processors( ], report_code=None, in_parallel=False, + is_final=False, ), ) kwargs = dict( @@ -1086,6 +1090,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): arguments_list=argument_list, report_code=None, in_parallel=False, + is_final=True, ), ) t2 = upload_finisher_task.signature( diff --git a/tasks/upload.py b/tasks/upload.py index 5548383be..84ce43167 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -3,6 +3,7 @@ import uuid from datetime import datetime, timedelta from json import loads +from math import ceil from typing import Any, List, Mapping, Optional from asgiref.sync import async_to_sync @@ -666,6 +667,9 @@ def _schedule_coverage_processing_task( arguments_list=chunk, report_code=commit_report.code, in_parallel=False, + is_final=True + if i == ceil(len(argument_list) / chunk_size) - 1 + else False, ), ) processing_tasks.append(sig) @@ -742,6 +746,7 @@ def _schedule_coverage_processing_task( i ], # i + parallel_session_id, in_parallel=True, + is_final=True if i == num_sessions - 1 else False, ), ) parallel_processing_tasks.append(sig) diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index 72c8cfce9..04bfc34fe 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -154,6 +154,7 @@ def run_impl( commit_yaml=commit_yaml, report_code=report_code, parallel_paths=parallel_paths, + processing_results=processing_results, ), ) task.apply_async() diff --git a/tasks/upload_processor.py b/tasks/upload_processor.py index e788f4d41..eb84f6dfe 100644 --- a/tasks/upload_processor.py +++ b/tasks/upload_processor.py @@ -19,6 +19,10 @@ from database.enums import CommitErrorTypes from database.models import Commit, Upload from helpers.metrics import metrics +from helpers.parallel_upload_processing import ( + save_final_serial_report_results, + save_incremental_report_results, +) from helpers.save_commit_error import save_commit_error from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.bots import RepositoryWithoutValidBotError @@ -83,6 +87,7 @@ def run_impl( report_code=None, parallel_idx=None, in_parallel=False, + is_final=False, **kwargs, ): repoid = int(repoid) @@ -151,6 +156,7 @@ def run_impl( parallel_idx=parallel_idx, parent_task=self.request.parent_id, in_parallel=in_parallel, + is_final=is_final, **kwargs, ) except LockError: @@ -180,6 +186,7 @@ def process_impl_within_lock( report_code, parallel_idx=None, in_parallel=False, + is_final=False, **kwargs, ): if ( @@ -315,7 +322,7 @@ def process_impl_within_lock( with metrics.timer( f"{self.metrics_prefix}.save_incremental_report_results" ): - parallel_incremental_result = self.save_incremental_report_results( + parallel_incremental_result = save_incremental_report_results( report_service, commit, report, parallel_idx, report_code ) parallel_incremental_result["upload_pk"] = arguments_list[0].get( @@ -342,6 +349,15 @@ def process_impl_within_lock( report_code, ) + # Save the final accumulated result from the serial flow for the + # ParallelVerification task to compare with later, for the parallel + # experiment. The report being saved is not necessarily the final + # report for the commit, as more uploads can still be made. + if is_final and (not in_parallel): + save_final_serial_report_results( + report_service, commit, report, report_code, arguments_list + ) + for processed_individual_report in processings_so_far: # We delete and rewrite the artifacts when the serial flow runs first. When # the parallel flow runs second, it parses the human readable artifacts instead @@ -593,34 +609,6 @@ def save_report_results( db_session.commit() return res - def save_incremental_report_results( - self, report_service, commit, report, parallel_idx, report_code - ): - commitid = commit.commitid - archive_service = report_service.get_archive_service(commit.repository) - - # save incremental results to archive storage, - # upload_finisher will combine - chunks = report.to_archive().encode() - _, files_and_sessions = report.to_database() - - chunks_url = archive_service.write_parallel_experiment_file( - commitid, chunks, report_code, f"incremental/chunk{parallel_idx}" - ) - files_and_sessions_url = archive_service.write_parallel_experiment_file( - commitid, - files_and_sessions, - report_code, - f"incremental/files_and_sessions{parallel_idx}", - ) - - parallel_incremental_result = { - "parallel_idx": parallel_idx, - "chunks_path": chunks_url, - "files_and_sessions_path": files_and_sessions_url, - } - return parallel_incremental_result - RegisteredUploadTask = celery_app.register_task(UploadProcessorTask()) upload_processor_task = celery_app.tasks[RegisteredUploadTask.name]