diff --git a/helpers/parallel_upload_processing.py b/helpers/parallel_upload_processing.py index c8faaf797..75a94ca4e 100644 --- a/helpers/parallel_upload_processing.py +++ b/helpers/parallel_upload_processing.py @@ -1,4 +1,68 @@ +import copy + import sentry_sdk +from shared.utils.sessions import SessionType + +from database.models.reports import Upload + + +# Copied from shared/reports/resources.py Report.next_session_number() +def next_session_number(session_dict): + start_number = len(session_dict) + while start_number in session_dict or str(start_number) in session_dict: + start_number += 1 + return start_number + + +# Copied and cut down from worker/services/report/raw_upload_processor.py +# this version stripped out all the ATS label stuff +def _adjust_sessions( + original_sessions: dict, + to_merge_flags, + current_yaml, +): + session_ids_to_fully_delete = [] + flags_under_carryforward_rules = [ + f for f in to_merge_flags if current_yaml.flag_has_carryfoward(f) + ] + if flags_under_carryforward_rules: + for sess_id, curr_sess in original_sessions.items(): + if curr_sess.session_type == SessionType.carriedforward: + if curr_sess.flags: + if any( + f in flags_under_carryforward_rules for f in curr_sess.flags + ): + session_ids_to_fully_delete.append(sess_id) + if session_ids_to_fully_delete: + # delete sessions from dict + for id in session_ids_to_fully_delete: + original_sessions.pop(id, None) + return + + +def get_parallel_session_ids( + sessions, argument_list, db_session, report_service, commit_yaml +): + num_sessions = len(argument_list) + + mock_sessions = copy.deepcopy(sessions) # the sessions already in the report + get_parallel_session_ids = [] + + # iterate over all uploads, get the next session id, and adjust sessions (remove CFF logic) + for i in range(num_sessions): + next_session_id = next_session_number(mock_sessions) + + upload_pk = argument_list[i]["upload_pk"] + upload = db_session.query(Upload).filter_by(id_=upload_pk).first() + to_merge_session = report_service.build_session(upload) + flags = upload.flag_names + + mock_sessions[next_session_id] = to_merge_session + _adjust_sessions(mock_sessions, flags, commit_yaml) + + get_parallel_session_ids.append(next_session_id) + + return get_parallel_session_ids @sentry_sdk.trace diff --git a/services/report/__init__.py b/services/report/__init__.py index 7e9b7e5f4..951d4eeb3 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -202,7 +202,7 @@ def has_initialized_report(self, commit: Commit) -> bool: @sentry_sdk.trace def initialize_and_save_report( - self, commit: Commit, report_code: str | None = None + self, commit: Commit, report_code: str = None ) -> CommitReport: """ Initializes the commit report @@ -412,7 +412,7 @@ def build_sessions(self, commit: Commit) -> dict[int, Session]: Does not include CF sessions if there is also an upload session with the same flag name. """ - sessions: dict[int, Session] = {} + sessions = {} carryforward_sessions = {} uploaded_flags = set() @@ -873,6 +873,7 @@ def build_report_from_raw_content( report: Report, raw_report_info: RawReportInfo, upload: Upload, + parallel_idx=None, ) -> ProcessingResult: """ Processes an upload on top of an existing report `master` and returns @@ -892,7 +893,6 @@ def build_report_from_raw_content( reportid = upload.external_id session = Session( - id=upload.id, provider=service, build=build, job=job, @@ -919,6 +919,7 @@ def build_report_from_raw_content( reportid=reportid, commit_yaml=self.current_yaml.to_dict(), archive_url=archive_url, + in_parallel=parallel_idx is not None, ), ) result.error = ProcessingError( @@ -954,11 +955,13 @@ def build_report_from_raw_content( raw_report, flags, session, - upload, + upload=upload, + parallel_idx=parallel_idx, ) result.report = process_result.report log.info( - "Successfully processed report", + "Successfully processed report" + + (" (in parallel)" if parallel_idx is not None else ""), extra=dict( session=session.id, ci=f"{session.provider}:{session.build}:{session.job}", diff --git a/services/report/raw_upload_processor.py b/services/report/raw_upload_processor.py index 6a4dcd7be..d4eb36fdd 100644 --- a/services/report/raw_upload_processor.py +++ b/services/report/raw_upload_processor.py @@ -22,9 +22,23 @@ log = logging.getLogger(__name__) -DEFAULT_LABEL_INDEX = { - SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index: SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label -} +GLOBAL_LEVEL_LABEL = ( + SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label +) + + +# This is a lambda function to return different objects +def DEFAULT_LABEL_INDEX(): + return { + SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index: SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label + } + + +def invert_pattern(string: str) -> str: + if string.startswith("!"): + return string[1:] + else: + return "!%s" % string @dataclass @@ -46,7 +60,8 @@ def process_raw_upload( raw_reports: ParsedRawReport, flags, session: Session, - upload: Upload | None = None, + upload: Upload = None, + parallel_idx=None, ) -> UploadProcessingResult: toc, env = None, None @@ -70,6 +85,17 @@ def process_raw_upload( else: ignored_file_lines = None + # Get a sessionid to merge into + # anything merged into the original_report + # will take on this sessionid + # But we don't actually merge yet in case the report is empty. + # This is done to avoid garbage sessions to build up in the report + # How can you be sure this will be the sessionid used when you actually merge it? Remember that this piece of code runs inside a lock u.u + if parallel_idx is not None: + sessionid = parallel_idx + else: + sessionid = report.next_session_number() + session.id = sessionid if env: session.env = dict([e.split("=", 1) for e in env.split("\n") if "=" in e]) @@ -77,12 +103,6 @@ def process_raw_upload( session.flags = flags skip_files = set() - # [javascript] check for both coverage.json and coverage/coverage.lcov - for report_file in raw_reports.get_uploaded_files(): - if report_file.filename == "coverage/coverage.json": - skip_files.add("coverage/coverage.lcov") - - temporary_report = Report() should_use_encoded_labels = ( upload @@ -90,11 +110,15 @@ def process_raw_upload( identifier=upload.report.commit.repository.repoid, default=False ) ) + # [javascript] check for both coverage.json and coverage/coverage.lcov + for report_file in raw_reports.get_uploaded_files(): + if report_file.filename == "coverage/coverage.json": + skip_files.add("coverage/coverage.lcov") + temporary_report = Report() if should_use_encoded_labels: # We initialize the labels_index (which defaults to {}) to force the special label # to always be index 0 - temporary_report.labels_index = dict(DEFAULT_LABEL_INDEX) - + temporary_report.labels_index = DEFAULT_LABEL_INDEX() joined = True for flag in flags or []: if read_yaml_field(commit_yaml, ("flags", flag, "joined")) is False: @@ -102,7 +126,6 @@ def process_raw_upload( "Customer is using joined=False feature", extra=dict(flag_used=flag) ) joined = False # TODO: ensure this works for parallel - # --------------- # Process reports # --------------- @@ -116,10 +139,9 @@ def process_raw_upload( path_fixer_to_use = path_fixer.get_relative_path_aware_pathfixer( current_filename ) - report_builder_to_use = ReportBuilder( commit_yaml, - session.id, + sessionid, ignored_lines, path_fixer_to_use, should_use_encoded_labels, @@ -130,10 +152,7 @@ def process_raw_upload( ) except ReportExpiredException as r: r.filename = current_filename - # FIXME: this will raise/abort processing *all* the files within an upload, - # even though maybe just one of those files is expired. raise - if report_from_file: if should_use_encoded_labels: # Copies the labels from report into temporary_report @@ -142,23 +161,36 @@ def process_raw_upload( temporary_report.merge(report_from_file, joined=True) path_fixer_to_use.log_abnormalities() - _possibly_log_pathfixer_unusual_results(path_fixer, session.id) - + actual_path_fixes = { + after: before + for (after, before) in path_fixer.calculated_paths.items() + if after is not None + } + if len(actual_path_fixes) > 0: + log.info( + "Example path fixes for this raw upload", + extra={ + "fixes": list(itertools.islice(actual_path_fixes.items(), 10)), + "disable_default_pathfixes": path_fixer.should_disable_default_pathfixes, + }, + ) + _possibly_log_pathfixer_unusual_results(path_fixer, sessionid) if not temporary_report: raise ReportEmptyError("No files found in report.") if ( should_use_encoded_labels - and temporary_report.labels_index == DEFAULT_LABEL_INDEX + and temporary_report.labels_index == DEFAULT_LABEL_INDEX() ): # This means that, even though this report _could_ use encoded labels, # none of the reports processed contributed any new labels to it. # So we assume there are no labels and just reset the _labels_index of temporary_report temporary_report.labels_index = None - # Now we actually add the session to the original_report # Because we know that the processing was successful - _sessionid, session = report.add_session(session, use_id_from_session=True) + sessionid, session = report.add_session( + session, use_id_from_session=parallel_idx is not None + ) # Adjust sessions removed carryforward sessions that are being replaced session_adjustment = _adjust_sessions( report, @@ -167,14 +199,13 @@ def process_raw_upload( current_yaml=commit_yaml, upload=upload, ) - report.merge(temporary_report, joined=joined) session.totals = temporary_report.totals return UploadProcessingResult(report=report, session_adjustment=session_adjustment) @sentry_sdk.trace -def make_sure_orginal_report_is_using_label_ids(original_report: Report): +def make_sure_orginal_report_is_using_label_ids(original_report: Report) -> bool: """Makes sure that the original_report (that was pulled from DB) has CoverageDatapoints that encode label_ids and not actual labels. """ @@ -184,13 +215,12 @@ def make_sure_orginal_report_is_using_label_ids(original_report: Report): } if original_report.labels_index is None: original_report.labels_index = {} - labels_index = original_report.labels_index if ( SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index - not in labels_index + not in original_report.labels_index ): - labels_index[ + original_report.labels_index[ SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index ] = SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label @@ -200,17 +230,17 @@ def possibly_translate_label(label_or_id: typing.Union[str, int]) -> int: if label_or_id in reverse_index_cache: return reverse_index_cache[label_or_id] # Search for label in the report index - for idx, label in labels_index.items(): + for idx, label in original_report.labels_index.items(): if label == label_or_id: reverse_index_cache[label] = idx return idx # Label is not present. Add to index. # Notice that this never picks index 0, that is reserved for the special label - new_index = max(labels_index.keys()) + 1 + new_index = max(original_report.labels_index.keys()) + 1 reverse_index_cache[label_or_id] = new_index # It's OK to update this here because it's inside the # UploadProcessing lock, so it's exclusive access - labels_index[new_index] = label_or_id + original_report.labels_index[new_index] = label_or_id return new_index for report_file in original_report: @@ -232,7 +262,7 @@ def make_sure_label_indexes_match( Uses the original_report as reference, and fixes the to_merge_report as needed it also extendes the original_report.labels_index with new labels as needed. """ - if to_merge_report.labels_index is None or original_report.labels_index is None: + if to_merge_report.labels_index is None: # The new report doesn't have labels to fix return @@ -305,7 +335,6 @@ def _adjust_sessions( commit_id = upload.report.commit_id if upload is None and to_partially_overwrite_flags: log.warning("Upload is None, but there are partial_overwrite_flags present") - if ( upload and USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID.check_value( @@ -316,7 +345,6 @@ def _adjust_sessions( # Make sure that the labels in the reports are in a good state to merge them make_sure_orginal_report_is_using_label_ids(original_report) make_sure_label_indexes_match(original_report, to_merge_report) - if to_fully_overwrite_flags or to_partially_overwrite_flags: for sess_id, curr_sess in original_report.sessions.items(): if curr_sess.session_type == SessionType.carriedforward: @@ -325,7 +353,6 @@ def _adjust_sessions( session_ids_to_fully_delete.append(sess_id) if any(f in to_partially_overwrite_flags for f in curr_sess.flags): session_ids_to_partially_delete.append(sess_id) - actually_fully_deleted_sessions = set() if session_ids_to_fully_delete: extra = dict( @@ -339,7 +366,6 @@ def _adjust_sessions( ) original_report.delete_multiple_sessions(session_ids_to_fully_delete) actually_fully_deleted_sessions.update(session_ids_to_fully_delete) - if session_ids_to_partially_delete: extra = dict( deleted_sessions=session_ids_to_partially_delete, @@ -361,7 +387,6 @@ def _adjust_sessions( ) actually_fully_deleted_sessions.add(s) original_report.delete_session(s) - return SessionAdjustmentResult( sorted(actually_fully_deleted_sessions), sorted(set(session_ids_to_partially_delete) - actually_fully_deleted_sessions), @@ -369,20 +394,6 @@ def _adjust_sessions( def _possibly_log_pathfixer_unusual_results(path_fixer: PathFixer, sessionid: int): - actual_path_fixes = { - after: before - for (after, before) in path_fixer.calculated_paths.items() - if after is not None - } - if len(actual_path_fixes) > 0: - log.info( - "Example path fixes for this raw upload", - extra={ - "fixes": list(itertools.islice(actual_path_fixes.items(), 10)), - "disable_default_pathfixes": path_fixer.should_disable_default_pathfixes, - }, - ) - if path_fixer.calculated_paths.get(None): ignored_files = sorted(path_fixer.calculated_paths.pop(None)) log.info( diff --git a/services/report/tests/unit/test_process.py b/services/report/tests/unit/test_process.py index 693358d11..172cf8931 100644 --- a/services/report/tests/unit/test_process.py +++ b/services/report/tests/unit/test_process.py @@ -76,7 +76,7 @@ def test_process_raw_upload(self, keys): report=master, raw_reports=parsed_report, flags=[], - session=Session(id=3 if "M" in keys else 0), + session=Session(), ) master = result.report @@ -203,7 +203,7 @@ def test_process_raw_upload_empty_report(self): raw_reports=LegacyReportParser().parse_raw_report_from_bytes( "\n".join(report_data).encode() ), - session=Session(id=1, flags=["fruits"]), + session=Session(flags=["fruits"]), flags=[], ) assert len(original_report.sessions) == 1 @@ -282,7 +282,7 @@ def test_none(self): Report(), LegacyReportParser().parse_raw_report_from_bytes(b""), [], - Session(id=0), + Session(), ) @@ -307,7 +307,7 @@ def test_fixes(self): reports.encode() ), flags=[], - session=Session(id=0), + session=Session(), ) report = result.report assert 2 not in report["file.go"], "2 never existed" @@ -347,7 +347,7 @@ def test_not_joined(self, mocker, flag, joined): b"a<<<<<< EOF" ), flags=[flag], - session=Session(id=1), + session=Session(), ) merge.assert_called_with(mocker.ANY, joined=joined) call_args, call_kwargs = merge.call_args @@ -363,7 +363,7 @@ def test_flags(self, flag): result = process.process_raw_upload( commit_yaml=UserYaml({"flags": {"docker": flag}}), report=Report(), - session=Session(id=0), + session=Session(), raw_reports=LegacyReportParser().parse_raw_report_from_bytes( b'{"coverage": {"tests/test.py": [null, 0], "folder/file.py": [null, 1]}}' ), @@ -380,7 +380,7 @@ def test_sessions(self): process.process_raw_upload( commit_yaml={}, report=report, - session=Session(id=0), + session=Session(), raw_reports=LegacyReportParser().parse_raw_report_from_bytes( b'{"coverage": {"tests/test.py": [null, 0], "folder/file.py": [null, 1]}}' ), @@ -389,7 +389,7 @@ def test_sessions(self): process.process_raw_upload( commit_yaml={}, report=report, - session=Session(id=1), + session=Session(), raw_reports=LegacyReportParser().parse_raw_report_from_bytes( b'{"coverage": {"tests/test.py": [null, 0], "folder/file.py": [null, 1]}}' ), @@ -914,7 +914,7 @@ def test_process_raw_upload_multiple_raw_reports(self, mocker): third_raw_report_result, ], ) - session = Session(id=1) + session = Session() result = process.process_raw_upload( UserYaml({}), original_report, @@ -1092,7 +1092,7 @@ def test_process_raw_upload_with_carryforwarded_flags(self): ), ], ) - session = Session(id=2, flags=upload_flags) + session = Session(flags=upload_flags) result = process.process_raw_upload( UserYaml( { diff --git a/tasks/tests/unit/test_upload_processing_task.py b/tasks/tests/unit/test_upload_processing_task.py index a8ff283e5..727133f91 100644 --- a/tasks/tests/unit/test_upload_processing_task.py +++ b/tasks/tests/unit/test_upload_processing_task.py @@ -20,7 +20,11 @@ ) from rollouts import USE_LABEL_INDEX_IN_REPORT_PROCESSING_BY_REPO_ID from services.archive import ArchiveService -from services.report import ProcessingError, RawReportInfo, ReportService +from services.report import ( + ProcessingError, + RawReportInfo, + ReportService, +) from services.report.parser.legacy import LegacyReportParser from services.report.raw_upload_processor import ( SessionAdjustmentResult, @@ -129,7 +133,7 @@ def test_upload_processor_task_call( ], }, "sessions": { - str(upload.id_): { + "0": { "N": None, "a": url, "c": None, @@ -140,12 +144,16 @@ def test_upload_processor_task_call( "p": None, "t": [3, 24, 19, 5, 0, "79.16667", 0, 0, 0, 0, 0, 0, 0], "u": None, - "d": commit.report_json["sessions"][str(upload.id_)]["d"], + "d": commit.report_json["sessions"]["0"]["d"], "st": "uploaded", "se": {}, } }, } + assert ( + commit.report_json["sessions"]["0"] + == expected_generated_report["sessions"]["0"] + ) assert commit.report_json == expected_generated_report mocked_1.assert_called_with(commit.commitid, None) # mocked_3.send_task.assert_called_with( @@ -257,7 +265,7 @@ def test_upload_processor_task_call_should_delete( ], }, "sessions": { - str(upload.id_): { + "0": { "N": None, "a": url, "c": None, @@ -268,12 +276,16 @@ def test_upload_processor_task_call_should_delete( "p": None, "t": [3, 24, 19, 5, 0, "79.16667", 0, 0, 0, 0, 0, 0, 0], "u": None, - "d": commit.report_json["sessions"][str(upload.id_)]["d"], + "d": commit.report_json["sessions"]["0"]["d"], "st": "uploaded", "se": {}, } }, } + assert ( + commit.report_json["sessions"]["0"] + == expected_generated_report["sessions"]["0"] + ) assert commit.report_json == expected_generated_report mocked_1.assert_called_with(commit.commitid, None) # mocked_3.send_task.assert_called_with( @@ -373,7 +385,7 @@ def test_upload_processor_call_with_upload_obj( ], }, "sessions": { - str(upload.id_): { + "0": { "N": None, "a": url, "c": None, @@ -384,12 +396,18 @@ def test_upload_processor_call_with_upload_obj( "p": None, "t": [3, 24, 19, 5, 0, "79.16667", 0, 0, 0, 0, 0, 0, 0], "u": None, - "d": commit.report_json["sessions"][str(upload.id_)]["d"], + "d": commit.report_json["sessions"]["0"]["d"], "st": "uploaded", "se": {}, } }, } + assert ( + commit.report_json["files"]["awesome/__init__.py"] + == expected_generated_report["files"]["awesome/__init__.py"] + ) + assert commit.report_json["files"] == expected_generated_report["files"] + assert commit.report_json["sessions"] == expected_generated_report["sessions"] assert commit.report_json == expected_generated_report mocked_1.assert_called_with(commit.commitid, None) diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index eda92ad40..2c062ebe7 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -13,6 +13,7 @@ from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError from shared.torngit.gitlab import Gitlab from shared.utils.sessions import SessionType +from shared.yaml import UserYaml from database.enums import ReportType from database.models import Upload @@ -800,7 +801,7 @@ def test_upload_task_no_bot( mocked_1.assert_called_with( mocker.ANY, commit, - {"codecov": {"max_report_age": "764y ago"}}, + UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ {"build": "part1", "url": "url1", "upload_pk": mocker.ANY}, {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, @@ -856,7 +857,7 @@ def test_upload_task_bot_no_permissions( mocked_1.assert_called_with( mocker.ANY, commit, - {"codecov": {"max_report_age": "764y ago"}}, + UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ {"build": "part1", "url": "url1", "upload_pk": mocker.ANY}, {"build": "part2", "url": "url2", "upload_pk": mocker.ANY}, @@ -933,7 +934,7 @@ def test_upload_task_bot_unauthorized( mocked_schedule_task.assert_called_with( mocker.ANY, commit, - {"codecov": {"max_report_age": "764y ago"}}, + UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ {"build": "part1", "url": "url1", "upload_pk": first_session.id}, {"build": "part2", "url": "url2", "upload_pk": second_session.id}, @@ -1024,7 +1025,7 @@ def fail_if_try_to_create_upload(*args, **kwargs): mocked_schedule_task.assert_called_with( mocker.ANY, commit, - {"codecov": {"max_report_age": "764y ago"}}, + UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ { "build": "part1", @@ -1144,7 +1145,7 @@ def test_normalize_upload_arguments( def test_schedule_task_with_one_task(self, dbsession, mocker): mocked_chain = mocker.patch("tasks.upload.chain") commit = CommitFactory.create() - commit_yaml = {"codecov": {"max_report_age": "100y ago"}} + commit_yaml = UserYaml({"codecov": {"max_report_age": "100y ago"}}) argument_dict = {"argument_dict": 1} argument_list = [argument_dict] dbsession.add(commit) @@ -1169,7 +1170,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): {}, repoid=commit.repoid, commitid=commit.commitid, - commit_yaml=commit_yaml, + commit_yaml=commit_yaml.to_dict(), arguments_list=argument_list, report_code=None, in_parallel=False, @@ -1179,7 +1180,7 @@ def test_schedule_task_with_one_task(self, dbsession, mocker): kwargs={ "repoid": commit.repoid, "commitid": commit.commitid, - "commit_yaml": commit_yaml, + "commit_yaml": commit_yaml.to_dict(), "report_code": None, "in_parallel": False, _kwargs_key(UploadFlow): mocker.ANY, diff --git a/tasks/upload.py b/tasks/upload.py index abeb9c751..3b7c99392 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -31,6 +31,7 @@ from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task +from helpers.parallel_upload_processing import get_parallel_session_ids from helpers.reports import delete_archive_setting from helpers.save_commit_error import save_commit_error from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO @@ -505,7 +506,7 @@ def run_impl_within_lock( scheduled_tasks = self.schedule_task( db_session, commit, - commit_yaml.to_dict(), + commit_yaml, argument_list, commit_report, upload_context, @@ -535,12 +536,14 @@ def schedule_task( self, db_session: Session, commit: Commit, - commit_yaml: dict, + commit_yaml: UserYaml, argument_list: list[dict], commit_report: CommitReport, upload_context: UploadContext, checkpoints: CheckpointLogger | None, ): + commit_yaml = commit_yaml.to_dict() + # Carryforward the parent BA report for the current commit's BA report when handling uploads # that's not bundle analysis type. self.possibly_carryforward_bundle_report( @@ -626,6 +629,26 @@ def _schedule_coverage_processing_task( if not do_parallel_processing: return serial_tasks.apply_async() + report_service = ReportService(commit_yaml) + sessions = report_service.build_sessions(commit=commit) + + original_session_ids = list(sessions.keys()) + parallel_session_ids = get_parallel_session_ids( + sessions, + argument_list, + db_session, + report_service, + UserYaml(commit_yaml), + ) + + log.info( + "Allocated the following session ids for parallel upload processing: " + + " ".join(str(id) for id in parallel_session_ids), + extra=upload_context.log_extra( + original_session_ids=original_session_ids, + ), + ) + parallel_processing_tasks = [ upload_processor_task.s( repoid=commit.repoid, @@ -633,11 +656,13 @@ def _schedule_coverage_processing_task( commit_yaml=commit_yaml, arguments_list=[arguments], report_code=commit_report.code, - parallel_idx=arguments["upload_pk"], + parallel_idx=parallel_session_id, in_parallel=True, is_final=False, ) - for arguments in argument_list + for arguments, parallel_session_id in zip( + argument_list, parallel_session_ids + ) ] finish_parallel_sig = upload_finisher_task.signature( @@ -721,7 +746,7 @@ def possibly_carryforward_bundle_report( self, commit: Commit, commit_report: CommitReport, - commit_yaml: dict, + commit_yaml: UserYaml, argument_list: List[Dict], ): """ diff --git a/tasks/upload_finisher.py b/tasks/upload_finisher.py index a639237b8..2c9372334 100644 --- a/tasks/upload_finisher.py +++ b/tasks/upload_finisher.py @@ -21,11 +21,10 @@ from app import celery_app from celery_config import notify_error_task_name from database.models import Commit, Pull -from database.models.core import Repository from helpers.checkpoint_logger import _kwargs_key from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs from helpers.checkpoint_logger.flows import UploadFlow -from helpers.metrics import KiB, MiB +from helpers.metrics import KiB, MiB, metrics from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.archive import ArchiveService, MinioEndpoints from services.comparison import get_or_create_comparison @@ -132,14 +131,23 @@ def run_impl( PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(identifier=repository.repoid) and in_parallel ): - processing_results = { - "processings_so_far": [ - task["processings_so_far"][0] for task in processing_results - ], - "parallel_incremental_result": [ - task["parallel_incremental_result"] for task in processing_results - ], + actual_processing_results = { + "processings_so_far": [], + "parallel_incremental_result": [], } + pr = None + + # need to transform processing_results produced by chord to get it into the + # same format as the processing_results produced from chain + for task in processing_results: + pr = task["processings_so_far"][0].get("pr") or pr + actual_processing_results["processings_so_far"].append( + task["processings_so_far"][0] + ) + actual_processing_results["parallel_incremental_result"].append( + task["parallel_incremental_result"] + ) + processing_results = actual_processing_results report_service = ReportService(commit_yaml) report = self.merge_incremental_reports( @@ -160,9 +168,10 @@ def run_impl( ), ) - parallel_paths = report_service.save_parallel_report_to_archive( - commit, report, report_code - ) + with metrics.timer(f"{self.metrics_prefix}.save_parallel_report_results"): + parallel_paths = report_service.save_parallel_report_to_archive( + commit, report, report_code + ) # now that we've built the report and stored it to GCS, we have what we need to # compare the results with the current upload pipeline. We end execution of the # finisher task here so that we don't cause any additional side-effects @@ -468,10 +477,10 @@ def invalidate_caches(self, redis_connection, commit: Commit): def merge_incremental_reports( self, commit_yaml: dict, - repository: Repository, + repository, commit: Commit, report_service: ReportService, - processing_results: dict, + processing_results, ): archive_service = report_service.get_archive_service(repository) repoid = repository.repoid @@ -538,12 +547,12 @@ def download_and_build_incremental_report(partial_report): "upload_pk": partial_report["upload_pk"], } - def merge_report(cumulative_report: Report, obj): - incremental_report: Report = obj["report"] + def merge_report(cumulative_report, obj): + incremental_report = obj["report"] parallel_idx = obj["parallel_idx"] if len(incremental_report.sessions) != 1: - log.warning( + log.warn( f"Incremental report does not have 1 session, it has {len(incremental_report.sessions)}", extra=dict( repoid=repoid, @@ -553,11 +562,13 @@ def merge_report(cumulative_report: Report, obj): ), ) - session = incremental_report.sessions[parallel_idx] - session.id = parallel_idx - _session_id, session = cumulative_report.add_session( - session, use_id_from_session=True + sessionid = next(iter(incremental_report.sessions)) + incremental_report.sessions[sessionid].id = sessionid + + session_id, session = cumulative_report.add_session( + incremental_report.sessions[parallel_idx], use_id_from_session=True ) + session.id = session_id _adjust_sessions( cumulative_report, incremental_report, session, UserYaml(commit_yaml) diff --git a/tasks/upload_processor.py b/tasks/upload_processor.py index 8a2d87b8d..00560ef43 100644 --- a/tasks/upload_processor.py +++ b/tasks/upload_processor.py @@ -1,5 +1,6 @@ import logging import random +from copy import deepcopy import sentry_sdk from asgiref.sync import async_to_sync @@ -137,23 +138,14 @@ def run_impl( timeout=max(60 * 5, self.hard_time_limit_task), blocking_timeout=5, ): - log.info( - "Obtained upload processing lock, starting", - extra=dict( - repoid=repoid, - commit=commitid, - parent_task=self.request.parent_id, - report_code=report_code, - ), - ) - + actual_arguments_list = deepcopy(arguments_list) return self.process_impl_within_lock( db_session=db_session, previous_results=previous_results, repoid=repoid, commitid=commitid, commit_yaml=commit_yaml, - arguments_list=arguments_list, + arguments_list=actual_arguments_list, report_code=report_code, parallel_idx=parallel_idx, in_parallel=in_parallel, @@ -188,7 +180,18 @@ def process_impl_within_lock( in_parallel=False, is_final=False, ): - processings_so_far: list[dict] = previous_results.get("processings_so_far", []) + if in_parallel: + log.info( + "Obtained upload processing lock, starting", + extra=dict( + repoid=repoid, + commit=commitid, + parent_task=self.request.parent_id, + report_code=report_code, + ), + ) + + processings_so_far = previous_results.get("processings_so_far", []) n_processed = 0 n_failed = 0 @@ -242,8 +245,11 @@ def process_impl_within_lock( in_parallel=in_parallel, ), ) - individual_info = {"arguments": arguments} + individual_info = {"arguments": arguments.copy()} try: + arguments_commitid = arguments.pop("commit", None) + if arguments_commitid: + assert arguments_commitid == commit.commitid with metrics.timer( f"{self.metrics_prefix}.process_individual_report" ): @@ -254,6 +260,7 @@ def process_impl_within_lock( report, upload_obj, raw_report_info, + parallel_idx=parallel_idx, in_parallel=in_parallel, ) # NOTE: this is only used because test mocking messes with the return value here. @@ -349,15 +356,15 @@ def process_impl_within_lock( ), ) - processing_results: dict = { + processing_result = { "processings_so_far": processings_so_far, } if in_parallel: - processing_results["parallel_incremental_result"] = ( + processing_result["parallel_incremental_result"] = ( parallel_incremental_result ) - return processing_results + return processing_result except CeleryError: raise except Exception: @@ -376,10 +383,11 @@ def process_individual_report( report: Report, upload: Upload, raw_report_info: RawReportInfo, + parallel_idx=None, in_parallel=False, ) -> ProcessingResult: processing_result = report_service.build_report_from_raw_content( - report, raw_report_info, upload=upload + report, raw_report_info, upload=upload, parallel_idx=parallel_idx ) if ( processing_result.error is not None