Skip to content

Commit

Permalink
Make ReportService.initialize_and_save_report sync
Browse files Browse the repository at this point in the history
It looks like historically there is a mix of sync/async code.
The consensus(?) seems to favor sync code as celery is fundamentally sync.
Async code also does not play as well with profiling, which will not show the async code directly in a flamegraph, instead just inserting a `wait` for the async code running in a different thread.
  • Loading branch information
Swatinem committed Aug 29, 2024
1 parent 1a36862 commit d8b1b1d
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 42 deletions.
2 changes: 1 addition & 1 deletion services/bundle_analysis/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def update_upload(self):


class BundleAnalysisReportService(BaseReportService):
async def initialize_and_save_report(
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
) -> CommitReport:
db_session = commit.get_db_session()
Expand Down
38 changes: 20 additions & 18 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any, Dict, Mapping, Optional, Sequence

import sentry_sdk
from asgiref.sync import async_to_sync
from celery.exceptions import SoftTimeLimitExceeded
from shared.config import get_config
from shared.django_apps.reports.models import ReportType
Expand All @@ -20,6 +21,7 @@
from shared.reports.resources import Report
from shared.reports.types import ReportFileSummary, ReportTotals
from shared.storage.exceptions import FileNotInStorageError
from shared.torngit.base import TorngitBaseAdapter
from shared.torngit.exceptions import TorngitError
from shared.upload.utils import UploaderType, insert_coverage_measurement
from shared.utils.sessions import Session, SessionType
Expand Down Expand Up @@ -125,7 +127,7 @@ def __init__(self, current_yaml: UserYaml):
current_yaml = UserYaml(current_yaml)
self.current_yaml = current_yaml

async def initialize_and_save_report(
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
) -> CommitReport:
raise NotImplementedError()
Expand Down Expand Up @@ -217,7 +219,8 @@ def has_initialized_report(self, commit: Commit) -> bool:
or commit._report_json_storage_path is not None
)

async def initialize_and_save_report(
@sentry_sdk.trace
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
) -> CommitReport:
"""
Expand Down Expand Up @@ -293,20 +296,18 @@ async def initialize_and_save_report(
db_session.add(report_details)
db_session.flush()
if not self.has_initialized_report(commit):
report = await self.create_new_report_for_commit(commit)
report = self.create_new_report_for_commit(commit)
if not report.is_empty():
# This means there is a report to carryforward
self.save_full_report(commit, report, report_code)

# Behind parallel processing flag, save the CFF report to GCS so the parallel variant of
# finisher can build off of it later. Makes the assumption that the CFFs occupy the first
# j to i session ids where i is the max id of the CFFs and j is some integer less than i.
if await PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value_async(
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
):
await self.save_parallel_report_to_archive(
commit, report, report_code
)
self.save_parallel_report_to_archive(commit, report, report_code)

Check warning on line 310 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L310

Added line #L310 was not covered by tests
highest_session_id = max(
report.sessions.keys()
) # the largest id among the CFFs
Expand Down Expand Up @@ -705,19 +706,19 @@ def get_appropriate_commit_to_carryforward_from(
return None
return parent_commit

async def _possibly_shift_carryforward_report(
def _possibly_shift_carryforward_report(
self, carryforward_report: Report, base_commit: Commit, head_commit: Commit
) -> Report:
with metrics.timer(
"services.report.ReportService.possibly_shift_carryforward_report"
):
try:
provider_service = get_repo_provider_service(
provider_service: TorngitBaseAdapter = get_repo_provider_service(
repository=head_commit.repository,
installation_name_to_use=self.gh_app_installation_name,
)
diff = (
await provider_service.get_compare(
async_to_sync(provider_service.get_compare)(
base=base_commit.commitid, head=head_commit.commitid
)
)["diff"]
Expand Down Expand Up @@ -754,7 +755,7 @@ async def _possibly_shift_carryforward_report(
)
return carryforward_report

async def create_new_report_for_commit(self, commit: Commit) -> Report:
def create_new_report_for_commit(self, commit: Commit) -> Report:
with metrics.timer(
"services.report.ReportService.create_new_report_for_commit"
):
Expand All @@ -780,7 +781,7 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report:
# a knob to turn for support requests about carryforward flags, and
# maybe we'll revisit a general rollout at a later time.
max_parenthood_deepness = (
await CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER.check_value_async(
CARRYFORWARD_BASE_SEARCH_RANGE_BY_OWNER.check_value(
identifier=repo.ownerid, default=10
)
)
Expand All @@ -793,7 +794,7 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report:
"Could not find parent for possible carryforward",
extra=dict(commit=commit.commitid, repoid=commit.repoid),
)
await metric_context.log_simple_metric_async(
metric_context.log_simple_metric(
"worker_service_report_carryforward_base_not_found", 1
)
return Report()
Expand Down Expand Up @@ -846,10 +847,10 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report:
# parent_commit and commit should belong to the same repository
carryforward_report.header = copy.deepcopy(parent_report.header)

await self._possibly_shift_carryforward_report(
self._possibly_shift_carryforward_report(
carryforward_report, parent_commit, commit
)
await metric_context.log_simple_metric_async(
metric_context.log_simple_metric(
"worker_service_report_carryforward_success", 1
)
return carryforward_report
Expand Down Expand Up @@ -1258,7 +1259,7 @@ def save_full_report(self, commit: Commit, report: Report, report_code=None):
return res

@sentry_sdk.trace
async def save_parallel_report_to_archive(
def save_parallel_report_to_archive(
self, commit: Commit, report: Report, report_code=None
):
commitid = commit.commitid
Expand All @@ -1267,11 +1268,12 @@ async def save_parallel_report_to_archive(

# Attempt to calculate diff of report (which uses commit info from the git provider), but it it fails to do so, it just moves on without such diff
try:
repository_service = get_repo_provider_service(
repository_service: TorngitBaseAdapter = get_repo_provider_service(

Check warning on line 1271 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L1271

Added line #L1271 was not covered by tests
repository,
installation_name_to_use=self.gh_app_installation_name,
)
report.apply_diff(await repository_service.get_commit_diff(commitid))
diff = async_to_sync(repository_service.get_commit_diff)(commitid)
report.apply_diff(diff)

Check warning on line 1276 in services/report/__init__.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/__init__.py#L1275-L1276

Added lines #L1275 - L1276 were not covered by tests
except TorngitError:
# When this happens, we have that commit.totals["diff"] is not available.
# Since there is no way to calculate such diff without the git commit,
Expand Down
2 changes: 1 addition & 1 deletion services/test_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, current_yaml: UserYaml):
super().__init__(current_yaml)
self.flag_dict = None

async def initialize_and_save_report(
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
) -> CommitReport:
db_session = commit.get_db_session()
Expand Down
30 changes: 12 additions & 18 deletions services/tests/test_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -4084,20 +4084,18 @@ def test_save_report_file_needing_repack(
)
assert mock_storage.storage["archive"][res["url"]].decode() == expected_content

@pytest.mark.asyncio
async def test_initialize_and_save_report_brand_new(self, dbsession, mock_storage):
def test_initialize_and_save_report_brand_new(self, dbsession, mock_storage):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()
report_service = ReportService({})
r = await report_service.initialize_and_save_report(commit)
r = report_service.initialize_and_save_report(commit)
assert r is not None
assert r.details is not None
assert r.details.files_array == []
assert len(mock_storage.storage["archive"]) == 0

@pytest.mark.asyncio
async def test_initialize_and_save_report_report_but_no_details(
def test_initialize_and_save_report_report_but_no_details(
self, dbsession, mock_storage
):
commit = CommitFactory.create()
Expand All @@ -4107,16 +4105,15 @@ async def test_initialize_and_save_report_report_but_no_details(
dbsession.add(report_row)
dbsession.flush()
report_service = ReportService({})
r = await report_service.initialize_and_save_report(commit)
r = report_service.initialize_and_save_report(commit)
dbsession.refresh(report_row)
assert r is not None
assert r.details is not None
assert r.details.files_array == []
assert len(mock_storage.storage["archive"]) == 0

@pytest.mark.asyncio
@pytest.mark.django_db(databases={"default"})
async def test_initialize_and_save_report_carryforward_needed(
def test_initialize_and_save_report_carryforward_needed(
self, dbsession, sample_commit_with_report_big, mocker, mock_storage
):
parent_commit = sample_commit_with_report_big
Expand All @@ -4129,7 +4126,7 @@ async def test_initialize_and_save_report_carryforward_needed(
dbsession.flush()
yaml_dict = {"flags": {"enterprise": {"carryforward": True}}}
report_service = ReportService(UserYaml(yaml_dict))
r = await report_service.initialize_and_save_report(commit)
r = report_service.initialize_and_save_report(commit)
assert len(r.uploads) == 2
first_upload = dbsession.query(Upload).filter_by(
report_id=r.id_, order_number=2
Expand Down Expand Up @@ -4194,9 +4191,8 @@ async def test_initialize_and_save_report_carryforward_needed(
"file_14.py",
]

@pytest.mark.asyncio
@pytest.mark.django_db(databases={"default"})
async def test_initialize_and_save_report_report_but_no_details_carryforward_needed(
def test_initialize_and_save_report_report_but_no_details_carryforward_needed(
self, dbsession, sample_commit_with_report_big, mock_storage
):
parent_commit = sample_commit_with_report_big
Expand All @@ -4212,7 +4208,7 @@ async def test_initialize_and_save_report_report_but_no_details_carryforward_nee
dbsession.flush()
yaml_dict = {"flags": {"enterprise": {"carryforward": True}}}
report_service = ReportService(UserYaml(yaml_dict))
r = await report_service.initialize_and_save_report(commit)
r = report_service.initialize_and_save_report(commit)
assert len(r.uploads) == 2
first_upload = dbsession.query(Upload).filter_by(
report_id=r.id_, order_number=2
Expand Down Expand Up @@ -4277,16 +4273,15 @@ async def test_initialize_and_save_report_report_but_no_details_carryforward_nee
"file_14.py",
]

@pytest.mark.asyncio
async def test_initialize_and_save_report_needs_backporting(
def test_initialize_and_save_report_needs_backporting(
self, dbsession, sample_commit_with_report_big, mock_storage, mocker
):
commit = sample_commit_with_report_big
report_service = ReportService({})
mocker.patch.object(
ReportDetails, "_should_write_to_storage", return_value=True
)
r = await report_service.initialize_and_save_report(commit)
r = report_service.initialize_and_save_report(commit)
assert r is not None
assert r.details is not None
assert len(r.uploads) == 4
Expand Down Expand Up @@ -4402,8 +4397,7 @@ async def test_initialize_and_save_report_needs_backporting(
storage_keys = mock_storage.storage["archive"].keys()
assert any(map(lambda key: key.endswith("chunks.txt"), storage_keys))

@pytest.mark.asyncio
async def test_initialize_and_save_report_existing_report(
def test_initialize_and_save_report_existing_report(
self, mock_storage, sample_report, dbsession, mocker
):
mocker_save_full_report = mocker.patch.object(ReportService, "save_full_report")
Expand All @@ -4418,7 +4412,7 @@ async def test_initialize_and_save_report_existing_report(
dbsession.flush()
report_service = ReportService({})
report_service.save_report(commit, sample_report)
res = await report_service.initialize_and_save_report(commit)
res = report_service.initialize_and_save_report(commit)
assert res == current_report_row
assert not mocker_save_full_report.called

Expand Down
4 changes: 1 addition & 3 deletions tasks/preprocess_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ def process_impl_within_lock(
commit_yaml, gh_app_installation_name=installation_name_to_use
)
# For parallel upload processing experiment, saving the report to GCS happens here
commit_report = async_to_sync(report_service.initialize_and_save_report)(
commit, report_code
)
commit_report = report_service.initialize_and_save_report(commit, report_code)
# Persist changes from within the lock
db_session.flush()
return {
Expand Down
2 changes: 1 addition & 1 deletion tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ def run_impl_within_lock(

try:
log.info("Initializing and saving report", extra=upload_context.log_extra())
commit_report = async_to_sync(report_service.initialize_and_save_report)(
commit_report = report_service.initialize_and_save_report(
commit,
upload_context.report_code,
)
Expand Down

0 comments on commit d8b1b1d

Please sign in to comment.