Skip to content

Commit

Permalink
Add metric for uploads per scheduled tasks
Browse files Browse the repository at this point in the history
The `Upload` task schedules multiple `UploadProcessor` tasks and another `UploadFinisher`.

This batching is rather accidental because of locking rather than intentional. It might be good to know how uploads are being grouped into a single processor/finisher chain.
  • Loading branch information
Swatinem committed Sep 18, 2024
1 parent 80952af commit a740c40
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 53 deletions.
7 changes: 2 additions & 5 deletions services/bundle_analysis/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
from typing import Any, Dict, Optional

import sentry_sdk
from shared.bundle_analysis import (
BundleAnalysisReport,
BundleAnalysisReportLoader,
)
from shared.bundle_analysis import BundleAnalysisReport, BundleAnalysisReportLoader
from shared.bundle_analysis.models import AssetType, MetadataKey
from shared.bundle_analysis.storage import get_bucket_name
from shared.django_apps.bundle_analysis.models import CacheConfig
Expand Down Expand Up @@ -99,7 +96,7 @@ def update_upload(self, carriedforward: Optional[bool] = False) -> None:

class BundleAnalysisReportService(BaseReportService):
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
self, commit: Commit, report_code: str | None = None
) -> CommitReport:
db_session = commit.get_db_session()

Expand Down
12 changes: 4 additions & 8 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(self, current_yaml: UserYaml):
self.current_yaml = current_yaml

def initialize_and_save_report(
self, commit: Commit, report_code: str = None
self, commit: Commit, report_code: str | None = None
) -> CommitReport:
raise NotImplementedError()

Expand Down Expand Up @@ -150,18 +150,15 @@ def create_report_upload(
Upload
"""
db_session = commit_report.get_db_session()
name = normalized_arguments.get("name")
upload = Upload(
external_id=normalized_arguments.get("reportid"),
build_code=normalized_arguments.get("build"),
build_url=normalized_arguments.get("build_url"),
env=None,
report_id=commit_report.id_,
job_code=normalized_arguments.get("job"),
name=(
normalized_arguments.get("name")[:100]
if normalized_arguments.get("name")
else None
),
name=(name[:100] if name else None),
provider=normalized_arguments.get("service"),
state="started",
storage_path=normalized_arguments.get("url"),
Expand Down Expand Up @@ -295,8 +292,7 @@ def create_report_upload(
self, normalized_arguments: Mapping[str, str], commit_report: CommitReport
) -> Upload:
upload = super().create_report_upload(normalized_arguments, commit_report)
flags = normalized_arguments.get("flags")
flags = flags.split(",") if flags else []
flags = normalized_arguments.get("flags", "").split(",")
self._attach_flags_to_upload(upload, flags)

# Insert entry in user measurements table only
Expand Down
5 changes: 1 addition & 4 deletions services/report/report_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
)


@sentry_sdk.trace
def report_type_matching(
report: ParsedUploadedReportFile, first_line: str
) -> (
Expand Down Expand Up @@ -200,10 +201,6 @@ def process_report(
continue
processor_name = type(processor).__name__

sentry_sdk.metrics.incr(
"services.report.report_processor.parser",
tags={"type": processor_name},
)
RAW_REPORT_SIZE.labels(processor=processor_name).observe(report.size)
with RAW_REPORT_PROCESSOR_RUNTIME_SECONDS.labels(
processor=processor_name
Expand Down
17 changes: 4 additions & 13 deletions services/test_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,12 @@
from sqlalchemy import desc

from database.enums import ReportType
from database.models import (
Commit,
CommitReport,
RepositoryFlag,
TestInstance,
Upload,
)
from database.models import Commit, CommitReport, RepositoryFlag, TestInstance, Upload
from helpers.notifier import BaseNotifier
from rollouts import FLAKY_SHADOW_MODE, FLAKY_TEST_DETECTION
from services.license import requires_license
from services.report import BaseReportService
from services.repository import (
get_repo_provider_service,
)
from services.repository import get_repo_provider_service
from services.urls import get_members_url, get_test_analytics_url
from services.yaml import read_yaml_field

Expand All @@ -33,7 +25,7 @@ def __init__(self, current_yaml: UserYaml):
self.flag_dict = None

def initialize_and_save_report(
self, commit: Commit, report_code: str = None
self, commit: Commit, report_code: str | None = None
) -> CommitReport:
db_session = commit.get_db_session()
current_report_row = (
Expand Down Expand Up @@ -63,8 +55,7 @@ def create_report_upload(
self, normalized_arguments: Mapping[str, str], commit_report: CommitReport
) -> Upload:
upload = super().create_report_upload(normalized_arguments, commit_report)
flags = normalized_arguments.get("flags")
flags = flags or []
flags = normalized_arguments.get("flags", "").split(",")
self._attach_flags_to_upload(upload, flags)
return upload

Expand Down
5 changes: 0 additions & 5 deletions tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -799,7 +799,6 @@ def test_upload_task_no_bot(
assert commit.message == ""
assert commit.parent_commit_id is None
mocked_1.assert_called_with(
mocker.ANY,
commit,
UserYaml({"codecov": {"max_report_age": "764y ago"}}),
[
Expand Down Expand Up @@ -855,7 +854,6 @@ def test_upload_task_bot_no_permissions(
assert commit.message == ""
assert commit.parent_commit_id is None
mocked_1.assert_called_with(
mocker.ANY,
commit,
UserYaml({"codecov": {"max_report_age": "764y ago"}}),
[
Expand Down Expand Up @@ -932,7 +930,6 @@ def test_upload_task_bot_unauthorized(
.first()
)
mocked_schedule_task.assert_called_with(
mocker.ANY,
commit,
UserYaml({"codecov": {"max_report_age": "764y ago"}}),
[
Expand Down Expand Up @@ -1023,7 +1020,6 @@ def fail_if_try_to_create_upload(*args, **kwargs):
assert commit.report is not None
assert commit.report.details is not None
mocked_schedule_task.assert_called_with(
mocker.ANY,
commit,
UserYaml({"codecov": {"max_report_age": "764y ago"}}),
[
Expand Down Expand Up @@ -1157,7 +1153,6 @@ def test_schedule_task_with_one_task(self, dbsession, mocker):
)
mock_checkpoints = MagicMock(name="checkpoints")
result = UploadTask().schedule_task(
dbsession,
commit,
commit_yaml,
argument_list,
Expand Down
44 changes: 26 additions & 18 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from shared.django_apps.codecov_metrics.service.codecov_metrics import (
UserOnboardingMetricsService,
)
from shared.metrics import Histogram
from shared.torngit.exceptions import TorngitClientError, TorngitRepoNotFoundError
from shared.yaml import UserYaml
from shared.yaml.user_yaml import OwnerContext
Expand All @@ -38,7 +39,11 @@
from services.archive import ArchiveService
from services.bundle_analysis.report import BundleAnalysisReportService
from services.redis import download_archive_from_redis, get_redis_connection
from services.report import NotReadyToBuildReportYetError, ReportService
from services.report import (
BaseReportService,
NotReadyToBuildReportYetError,
ReportService,
)
from services.repository import (
create_webhook_on_provider,
fetch_commit_yaml_and_possibly_store,
Expand All @@ -60,6 +65,14 @@
CHUNK_SIZE = 3


UPLOADS_PER_TASK_SCHEDULE = Histogram(
"worker_uploads_per_schedule",
"The number of individual uploads scheduled for processing",
["report_type"],
buckets=[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 12, 15, 20, 25, 30, 40, 50],
)


class UploadContext:
"""
Encapsulates the arguments passed to an upload task. This includes both the
Expand Down Expand Up @@ -370,7 +383,7 @@ def run_impl(
retry_countdown = 20 * 2**self.request.retries
log.warning(
"Retrying upload",
extra=upload_context.log_extra(countdown=int(retry_countdown)),
extra=upload_context.log_extra(countdown=retry_countdown),
)
self.retry(
max_retries=3,
Expand Down Expand Up @@ -408,6 +421,7 @@ def run_impl_within_lock(
repository = commit.repository
repository.updatestamp = datetime.now()
repository_service = None

was_updated, was_setup = False, False
try:
installation_name_to_use = get_installation_name_for_owner_for_task(
Expand Down Expand Up @@ -442,6 +456,7 @@ def run_impl_within_lock(
extra=upload_context.log_extra(),
exc_info=True,
)

if repository_service:
commit_yaml = fetch_commit_yaml_and_possibly_store(
commit, repository_service
Expand All @@ -459,6 +474,7 @@ def run_impl_within_lock(
owner_context=context,
)

report_service: BaseReportService
if report_type == ReportType.COVERAGE:
# TODO: consider renaming class to `CoverageReportService`
report_service = ReportService(
Expand All @@ -474,8 +490,7 @@ def run_impl_within_lock(
try:
log.info("Initializing and saving report", extra=upload_context.log_extra())
commit_report = report_service.initialize_and_save_report(
commit,
upload_context.report_code,
commit, upload_context.report_code
)
except NotReadyToBuildReportYetError:
log.warning(
Expand Down Expand Up @@ -503,8 +518,11 @@ def run_impl_within_lock(

if argument_list:
db_session.commit()

UPLOADS_PER_TASK_SCHEDULE.labels(report_type=report_type.value).observe(
len(argument_list)
)
scheduled_tasks = self.schedule_task(
db_session,
commit,
commit_yaml,
argument_list,
Expand All @@ -530,11 +548,11 @@ def run_impl_within_lock(
"Not scheduling task because there were no arguments found on redis",
extra=upload_context.log_extra(),
)

return {"was_setup": was_setup, "was_updated": was_updated}

def schedule_task(
self,
db_session: Session,
commit: Commit,
commit_yaml: UserYaml,
argument_list: list[dict],
Expand All @@ -557,20 +575,12 @@ def schedule_task(
)
assert checkpoints
return self._schedule_coverage_processing_task(
db_session,
commit,
commit_yaml,
argument_list,
commit_report,
upload_context,
checkpoints,
commit, commit_yaml, argument_list, commit_report, checkpoints
)
elif upload_context.report_type == ReportType.BUNDLE_ANALYSIS:
assert commit_report.report_type == ReportType.BUNDLE_ANALYSIS.value
return self._schedule_bundle_analysis_processing_task(
commit,
commit_yaml,
argument_list,
commit, commit_yaml, argument_list
)
elif upload_context.report_type == ReportType.TEST_RESULTS:
assert commit_report.report_type == ReportType.TEST_RESULTS.value
Expand All @@ -581,12 +591,10 @@ def schedule_task(

def _schedule_coverage_processing_task(
self,
db_session: Session,
commit: Commit,
commit_yaml: dict,
argument_list: list[dict],
commit_report: CommitReport,
upload_context: UploadContext,
checkpoints: CheckpointLogger,
):
checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE)
Expand Down

0 comments on commit a740c40

Please sign in to comment.