Skip to content

Commit

Permalink
Avoid copying raw upload for parallel verification
Browse files Browse the repository at this point in the history
Instead of creating a copy of a raw upload for consumption in the parallel verification task,
this will only schedule parallel verification in case the uploads are not being deleted after processing.

That way, the uploads are guaranteed to still be there for verification, so no copy is needed.
  • Loading branch information
Swatinem committed Sep 12, 2024
1 parent 7ebdf9f commit 18009c5
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 66 deletions.
13 changes: 13 additions & 0 deletions helpers/reports.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
from collections import namedtuple

from shared.config import get_config
from shared.yaml import UserYaml

from services.yaml.reader import read_yaml_field

null = namedtuple("_", ["totals"])(None)


def get_totals_from_file_in_reports(report, path):
return report.get(path, null).totals


def delete_archive_setting(commit_yaml: UserYaml | dict) -> bool:
if get_config("services", "minio", "expire_raw_after_n_days"):
return True
return not read_yaml_field(
commit_yaml, ("codecov", "archive", "uploads"), _else=True
)
47 changes: 4 additions & 43 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@
RAW_UPLOAD_RAW_REPORT_COUNT,
RAW_UPLOAD_SIZE,
)
from services.report.raw_upload_processor import (
process_raw_upload,
)
from services.report.raw_upload_processor import process_raw_upload
from services.repository import get_repo_provider_service
from services.yaml.reader import get_paths_from_flags, read_yaml_field

Expand Down Expand Up @@ -840,7 +838,7 @@ def create_new_report_for_commit(self, commit: Commit) -> Report:

@sentry_sdk.trace
def parse_raw_report_from_storage(
self, repo: Repository, upload: Upload, is_parallel=False
self, repo: Repository, upload: Upload
) -> ParsedRawReport:
"""Pulls the raw uploaded report from storage and parses it so it's
easier to access different parts of the raw upload.
Expand All @@ -851,51 +849,16 @@ def parse_raw_report_from_storage(
archive_service = self.get_archive_service(repo)
archive_url = upload.storage_path

# TODO: For the parallel experiment, can remove once finished
log.info(
"Parsing the raw report from storage",
extra=dict(
commit=upload.report.commit_id,
repoid=repo.repoid,
archive_url=archive_url,
is_parallel=is_parallel,
),
)

# For the parallel upload verification experiment, we need to make a copy of the raw uploaded reports
# so that the parallel pipeline can use those to parse. The serial pipeline rewrites the raw uploaded
# reports to a human readable version that doesn't include file fixes, so that's why copying is necessary.
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=repo.repoid, default=False
):
parallel_url = archive_url.removesuffix(".txt") + "_PARALLEL.txt"
log.info(
"In the parallel experiment for parsing raw report in storage",
extra=dict(
commit=upload.report.commit_id,
repoid=repo.repoid,
parallel_url=parallel_url,
archive_url=archive_url,
),
)
if not is_parallel:
archive_file = archive_service.read_file(archive_url)
archive_service.write_file(parallel_url, archive_file)
log.info(
"Copied raw report file for parallel experiment to: "
+ str(parallel_url),
extra=dict(commit=upload.report.commit_id, repoid=repo.repoid),
)
else:
archive_url = parallel_url
archive_file = archive_service.read_file(archive_url)
log.info(
"Read raw report file for parallel experiment from: "
+ str(archive_url),
extra=dict(commit=upload.report.commit_id, repoid=repo.repoid),
)
else:
archive_file = archive_service.read_file(archive_url)
archive_file = archive_service.read_file(archive_url)

parser = get_proper_parser(upload, archive_file)
upload_version = (
Expand Down Expand Up @@ -964,9 +927,7 @@ def build_report_from_raw_content(
raw_report_info.upload = upload.external_id

try:
raw_report = self.parse_raw_report_from_storage(
commit.repository, upload, is_parallel=parallel_idx is not None
)
raw_report = self.parse_raw_report_from_storage(commit.repository, upload)
raw_report_info.raw_report = raw_report
except FileNotInStorageError:
log.info(
Expand Down
2 changes: 1 addition & 1 deletion services/yaml/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""


def read_yaml_field(yaml_dict: UserYaml, keys, _else=None) -> Any:
def read_yaml_field(yaml_dict: UserYaml | Mapping[str, Any], keys, _else=None) -> Any:
log.debug("Field %s requested", keys)
try:
for key in keys:
Expand Down
2 changes: 1 addition & 1 deletion tasks/tests/unit/test_upload_processing_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def test_upload_task_call_exception_within_individual_upload(
assert upload.state_id == UploadState.ERROR.db_id
assert upload.state == "error"
assert not mocked_3.called
mocked_4.assert_called_with(commit.repository, upload, is_parallel=False)
mocked_4.assert_called_with(commit.repository, upload)
mocked_5.assert_called()

@pytest.mark.django_db(databases={"default"})
Expand Down
17 changes: 5 additions & 12 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
from shared.django_apps.codecov_metrics.service.codecov_metrics import (
UserOnboardingMetricsService,
)
from shared.torngit.exceptions import (
TorngitClientError,
TorngitRepoNotFoundError,
)
from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError
from shared.yaml import UserYaml
from shared.yaml.user_yaml import OwnerContext
from sqlalchemy.orm import Session
Expand All @@ -29,17 +26,13 @@
from database.enums import CommitErrorTypes, ReportType
from database.models import Commit, CommitReport
from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME
from helpers.checkpoint_logger import (
CheckpointLogger,
_kwargs_key,
)
from helpers.checkpoint_logger import (
from_kwargs as checkpoints_from_kwargs,
)
from helpers.checkpoint_logger import CheckpointLogger, _kwargs_key
from helpers.checkpoint_logger import from_kwargs as checkpoints_from_kwargs
from helpers.checkpoint_logger.flows import TestResultsFlow, UploadFlow
from helpers.exceptions import RepositoryWithoutValidBotError
from helpers.github_installation import get_installation_name_for_owner_for_task
from helpers.parallel_upload_processing import get_parallel_session_ids
from helpers.reports import delete_archive_setting
from helpers.save_commit_error import save_commit_error
from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO
from services.archive import ArchiveService
Expand Down Expand Up @@ -635,7 +628,7 @@ def _schedule_coverage_processing_task(

do_parallel_processing = PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
)
) and not delete_archive_setting(commit_yaml)

if not do_parallel_processing:
return serial_tasks.apply_async()
Expand Down
11 changes: 2 additions & 9 deletions tasks/upload_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
save_final_serial_report_results,
save_incremental_report_results,
)
from helpers.reports import delete_archive_setting
from helpers.save_commit_error import save_commit_error
from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO
from services.archive import ArchiveService
from services.redis import get_redis_connection
from services.report import ProcessingResult, RawReportInfo, Report, ReportService
from services.repository import get_repo_provider_service
from services.yaml import read_yaml_field
from tasks.base import BaseCodecovTask

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -438,21 +438,14 @@ def process_individual_report(

return processing_result

def should_delete_archive(self, commit_yaml: UserYaml) -> bool:
if get_config("services", "minio", "expire_raw_after_n_days"):
return True
return not read_yaml_field(
commit_yaml, ("codecov", "archive", "uploads"), _else=True
)

@sentry_sdk.trace
def postprocess_raw_reports(
self,
report_service: ReportService,
commit: Commit,
reports: list[RawReportInfo],
):
should_delete_archive = self.should_delete_archive(report_service.current_yaml)
should_delete_archive = delete_archive_setting(report_service.current_yaml)
archive_service = report_service.get_archive_service(commit.repository)

if should_delete_archive:
Expand Down

0 comments on commit 18009c5

Please sign in to comment.