From 45d51b3b01c93b982d7a46694db4943895b1bbc2 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Fri, 16 Aug 2024 12:16:41 +0200 Subject: [PATCH] Refactor `Upload` task code a bit This does a couple of things: - Moves some logging out to the main task function - Removes some task checks, as there is already a check for `argument_list` in the main task fn - Also removes checks for `checkpoints`, as those should always exist, and adds assertions for that --- tasks/tests/unit/test_upload_task.py | 53 ++- tasks/upload.py | 464 +++++++++++---------------- 2 files changed, 210 insertions(+), 307 deletions(-) diff --git a/tasks/tests/unit/test_upload_task.py b/tasks/tests/unit/test_upload_task.py index 13b9304d3..f49213971 100644 --- a/tasks/tests/unit/test_upload_task.py +++ b/tasks/tests/unit/test_upload_task.py @@ -724,6 +724,7 @@ def test_upload_task_no_bot( assert commit.message == "" assert commit.parent_commit_id is None mocked_1.assert_called_with( + mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -733,7 +734,6 @@ def test_upload_task_no_bot( commit.report, mocker.ANY, mocker.ANY, - mocker.ANY, ) assert not mocked_fetch_yaml.called @@ -780,6 +780,7 @@ def test_upload_task_bot_no_permissions( assert commit.message == "" assert commit.parent_commit_id is None mocked_1.assert_called_with( + mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -789,7 +790,6 @@ def test_upload_task_bot_no_permissions( commit.report, mocker.ANY, mocker.ANY, - mocker.ANY, ) assert not mocked_fetch_yaml.called @@ -835,9 +835,9 @@ def test_upload_task_bot_unauthorized( commitid=commit.commitid, redis_connection=mock_redis, ) + mock_checkpoints = MagicMock(name="checkpoints") result = UploadTask().run_impl_within_lock( - dbsession, - upload_args, + dbsession, upload_args, checkpoints=mock_checkpoints, kwargs={} ) assert {"was_setup": False, "was_updated": False} == result assert commit.message == "" @@ -857,6 +857,7 @@ def test_upload_task_bot_unauthorized( .first() ) mocked_schedule_task.assert_called_with( + mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -866,7 +867,6 @@ def test_upload_task_bot_unauthorized( commit.report, mocker.ANY, mocker.ANY, - mocker.ANY, ) def test_upload_task_upload_already_created( @@ -938,9 +938,9 @@ def fail_if_try_to_create_upload(*args, **kwargs): commitid=commit.commitid, redis_connection=mock_redis, ) + mock_checkpoints = MagicMock(name="checkpoints") result = UploadTask().run_impl_within_lock( - dbsession, - upload_args, + dbsession, upload_args, checkpoints=mock_checkpoints, kwargs={} ) assert {"was_setup": True, "was_updated": True} == result assert commit.message == "" @@ -948,6 +948,7 @@ def fail_if_try_to_create_upload(*args, **kwargs): assert commit.report is not None assert commit.report.details is not None mocked_schedule_task.assert_called_with( + mocker.ANY, commit, UserYaml({"codecov": {"max_report_age": "764y ago"}}), [ @@ -961,7 +962,6 @@ def fail_if_try_to_create_upload(*args, **kwargs): report, mocker.ANY, mocker.ANY, - mocker.ANY, ) @@ -1067,37 +1067,29 @@ def test_normalize_upload_arguments( assert b"Some weird value" == content @pytest.mark.django_db(databases={"default"}) - def test_schedule_task_with_no_tasks(self, dbsession, mocker): + def test_schedule_task_with_one_task(self, dbsession, mocker): + mocked_chain = mocker.patch("tasks.upload.chain") commit = CommitFactory.create() - commit_yaml = UserYaml({}) - argument_list = [] + commit_yaml = UserYaml({"codecov": {"max_report_age": "100y ago"}}) + argument_dict = {"argument_dict": 1} + argument_list = [argument_dict] dbsession.add(commit) dbsession.flush() + upload_args = UploadContext( + repoid=commit.repoid, + commitid=commit.commitid, + redis_connection=mock_redis, + ) mock_checkpoints = MagicMock(name="checkpoints") result = UploadTask().schedule_task( + dbsession, commit, commit_yaml, argument_list, ReportFactory.create(), - None, - dbsession, + upload_args, checkpoints=mock_checkpoints, ) - assert result is None - mock_checkpoints.log.assert_called_with(UploadFlow.NO_REPORTS_FOUND) - - @pytest.mark.django_db(databases={"default"}) - def test_schedule_task_with_one_task(self, dbsession, mocker): - mocked_chain = mocker.patch("tasks.upload.chain") - commit = CommitFactory.create() - commit_yaml = UserYaml({"codecov": {"max_report_age": "100y ago"}}) - argument_dict = {"argument_dict": 1} - argument_list = [argument_dict] - dbsession.add(commit) - dbsession.flush() - result = UploadTask().schedule_task( - commit, commit_yaml, argument_list, ReportFactory.create(), None, dbsession - ) assert result == mocked_chain.return_value.apply_async.return_value t1 = upload_processor_task.s( {}, @@ -1492,5 +1484,8 @@ def test_upload_not_ready_to_build_report( commitid=commit.commitid, redis_connection=mock_redis, ) + mock_checkpoints = MagicMock(name="checkpoints") with pytest.raises(Retry): - task.run_impl_within_lock(dbsession, upload_args) + task.run_impl_within_lock( + dbsession, upload_args, checkpoints=mock_checkpoints, kwargs={} + ) diff --git a/tasks/upload.py b/tasks/upload.py index 2a861c787..8c2799fe5 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -22,12 +22,14 @@ ) from shared.yaml import UserYaml from shared.yaml.user_yaml import OwnerContext +from sqlalchemy.orm import Session from app import celery_app from database.enums import CommitErrorTypes, ReportType from database.models import Commit, CommitReport from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from helpers.checkpoint_logger import ( + CheckpointLogger, _kwargs_key, ) from helpers.checkpoint_logger import ( @@ -88,6 +90,29 @@ def __init__( self.report_code = report_code self.redis_connection = redis_connection or get_redis_connection() + def log_extra(self, **kwargs) -> dict: + return dict( + repoid=self.repoid, + commit=self.commitid, + report_type=self.report_type.value, + report_code=self.report_code, + **kwargs, + ) + + def get_checkpoints(self, kwargs: dict) -> CheckpointLogger | None: + # TODO: setup checkpoint flows for other coverage types + if self.report_type == ReportType.COVERAGE: + # If we're a retry, kwargs will already have our first checkpoint. + # If not, log it directly into kwargs so we can pass it onto other tasks + return checkpoints_from_kwargs(UploadFlow, kwargs).log( + UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True + ) + elif self.report_type == ReportType.TEST_RESULTS: + return checkpoints_from_kwargs(TestResultsFlow, kwargs).log( + TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True + ) + return None + def lock_name(self, lock_type: str): if self.report_type == ReportType.COVERAGE: # for backward compat this does not include the report type @@ -126,14 +151,13 @@ def last_upload_timestamp(self): ) return self.redis_connection.get(redis_key) - def prepare_kwargs_for_retry(self, kwargs: dict): - kwargs.update( - { - "repoid": self.repoid, - "commitid": self.commitid, - "report_type": self.report_type.value, - "report_code": self.report_code, - } + def kwargs_for_retry(self, kwargs: dict) -> dict: + return dict( + **kwargs, + repoid=self.repoid, + commitid=self.commitid, + report_type=self.report_type.value, + report_code=self.report_code, ) def arguments_list(self): @@ -255,42 +279,22 @@ class UploadTask(BaseCodecovTask, name=upload_task_name): def run_impl( self, - db_session, - repoid, - commitid, + db_session: Session, + repoid: int, + commitid: str, report_type="coverage", report_code=None, *args, **kwargs, ): - # TODO: setup checkpoint flows for other coverage types - if report_type == ReportType.COVERAGE.value: - # If we're a retry, kwargs will already have our first checkpoint. - # If not, log it directly into kwargs so we can pass it onto other tasks - checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs).log( - UploadFlow.UPLOAD_TASK_BEGIN, kwargs=kwargs, ignore_repeat=True - ) - elif report_type == ReportType.TEST_RESULTS.value: - checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs).log( - TestResultsFlow.TEST_RESULTS_BEGIN, kwargs=kwargs, ignore_repeat=True - ) - - repoid = int(repoid) - log.info( - "Received upload task", - extra=dict( - repoid=repoid, - commit=commitid, - report_type=report_type, - report_code=report_code, - ), - ) upload_context = UploadContext( - repoid=repoid, + repoid=int(repoid), commitid=commitid, report_type=ReportType(report_type), report_code=report_code, ) + checkpoints = upload_context.get_checkpoints(kwargs) + log.info("Received upload task", extra=upload_context.log_extra()) if not upload_context.has_pending_jobs(): log.info("No pending jobs. Upload task is done.") @@ -303,27 +307,21 @@ def run_impl( if upload_context.is_currently_processing() and self.request.retries == 0: log.info( "Currently processing upload. Retrying in 60s.", - extra=dict( - repoid=repoid, - commit=commitid, - report_type=report_type, - ), + extra=upload_context.log_extra(), ) - upload_context.prepare_kwargs_for_retry(kwargs) - self.retry(countdown=60, kwargs=kwargs) + self.retry(countdown=60, kwargs=upload_context.kwargs_for_retry(kwargs)) if retry_countdown := _should_debounce_processing(upload_context): log.info( "Retrying due to very recent uploads.", - extra=dict( - repoid=upload_context.repoid, - commit=upload_context.commitid, - report_type=upload_context.report_type.value, + extra=upload_context.log_extra( countdown=retry_countdown, ), ) - upload_context.prepare_kwargs_for_retry(kwargs) - self.retry(countdown=retry_countdown, kwargs=kwargs) + self.retry( + countdown=retry_countdown, + kwargs=upload_context.kwargs_for_retry(kwargs), + ) lock_name = upload_context.lock_name("upload") try: @@ -344,8 +342,8 @@ def run_impl( return self.run_impl_within_lock( db_session, upload_context, - *args, - **kwargs, + checkpoints, + kwargs, ) except LockError: log.warning( @@ -356,7 +354,7 @@ def run_impl( if not upload_context.has_pending_jobs(): log.info( "Not retrying since there are likely no jobs that need scheduling", - extra=dict(commit=commitid, repoid=repoid, report_type=report_type), + extra=upload_context.log_extra(), ) if checkpoints: checkpoints.log(UploadFlow.NO_PENDING_JOBS) @@ -368,7 +366,7 @@ def run_impl( if self.request.retries > 1: log.info( "Not retrying since we already had too many retries", - extra=dict(commit=commitid, repoid=repoid, report_type=report_type), + extra=upload_context.log_extra(), ) if checkpoints: checkpoints.log(UploadFlow.TOO_MANY_RETRIES) @@ -381,54 +379,40 @@ def run_impl( retry_countdown = 20 * 2**self.request.retries log.warning( "Retrying upload", - extra=dict( - commit=commitid, - repoid=repoid, - report_type=report_type, - countdown=int(retry_countdown), - ), + extra=upload_context.log_extra(countdown=int(retry_countdown)), + ) + self.retry( + max_retries=3, + countdown=retry_countdown, + kwargs=upload_context.kwargs_for_retry(kwargs), ) - upload_context.prepare_kwargs_for_retry(kwargs) - self.retry(max_retries=3, countdown=retry_countdown, kwargs=kwargs) @sentry_sdk.trace def run_impl_within_lock( self, - db_session, + db_session: Session, upload_context: UploadContext, - *args, - **kwargs, + checkpoints: CheckpointLogger | None, + kwargs: dict, ): - log.info( - "Starting processing of report", - extra=dict( - repoid=upload_context.repoid, - commit=upload_context.commitid, - report_type=upload_context.report_type.value, - report_code=upload_context.report_code, - ), - ) + log.info("Starting processing of report", extra=upload_context.log_extra()) repoid = upload_context.repoid - commitid = upload_context.commitid report_type = upload_context.report_type - report_code = upload_context.report_code - checkpoints = None if report_type == ReportType.COVERAGE: try: - checkpoints = checkpoints_from_kwargs(UploadFlow, kwargs) + assert checkpoints checkpoints.log(UploadFlow.PROCESSING_BEGIN) except ValueError as e: log.warning( "CheckpointLogger failed to log/submit", extra=dict(error=e) ) - elif report_type == ReportType.TEST_RESULTS: - checkpoints = checkpoints_from_kwargs(TestResultsFlow, kwargs) - commits = db_session.query(Commit).filter( - Commit.repoid == repoid, Commit.commitid == commitid + commit = ( + db_session.query(Commit) + .filter(Commit.repoid == repoid, Commit.commitid == upload_context.commitid) + .first() ) - commit = commits.first() assert commit, "Commit not found in database." repository = commit.repository repository.updatestamp = datetime.now() @@ -454,17 +438,17 @@ def run_impl_within_lock( log.warning( "Unable to reach git provider because repo doesn't have a valid bot", - extra=dict(repoid=repoid, commit=commitid), + extra=upload_context.log_extra(), ) except TorngitRepoNotFoundError: log.warning( "Unable to reach git provider because this specific bot/integration can't see that repository", - extra=dict(repoid=repoid, commit=commitid), + extra=upload_context.log_extra(), ) except TorngitClientError: log.warning( "Unable to reach git provider because there was a 4xx error", - extra=dict(repoid=repoid, commit=commitid), + extra=upload_context.log_extra(), exc_info=True, ) if repository_service: @@ -497,33 +481,23 @@ def run_impl_within_lock( raise NotImplementedError(f"no report service for: {report_type.value}") try: - log.info( - "Initializing and saving report", - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - report_type=report_type.value, - report_code=report_code, - ), - ) + log.info("Initializing and saving report", extra=upload_context.log_extra()) commit_report = async_to_sync(report_service.initialize_and_save_report)( commit, - report_code, + upload_context.report_code, ) except NotReadyToBuildReportYetError: log.warning( "Commit not yet ready to build its initial report. Retrying in 60s.", - extra=dict(repoid=commit.repoid, commit=commit.commitid), + extra=upload_context.log_extra(), ) - upload_context.prepare_kwargs_for_retry(kwargs) - self.retry(countdown=60, kwargs=kwargs) + self.retry(countdown=60, kwargs=upload_context.kwargs_for_retry(kwargs)) UserOnboardingMetricsService.create_user_onboarding_metric( org_id=repository.ownerid, event="COMPLETED_UPLOAD", payload={} ) argument_list = [] - for arguments in upload_context.arguments_list(): normalized_arguments = upload_context.normalize_arguments(commit, arguments) if "upload_id" in normalized_arguments: @@ -540,95 +514,87 @@ def run_impl_within_lock( if argument_list: db_session.commit() - self.schedule_task( + scheduled_tasks = self.schedule_task( + db_session, commit, commit_yaml, argument_list, commit_report, upload_context, - db_session, checkpoints, ) + + log.info( + f"Scheduling {upload_context.report_type.value} processing tasks", + extra=upload_context.log_extra( + argument_list=argument_list, + number_arguments=len(argument_list), + scheduled_task_ids=scheduled_tasks.as_tuple(), + ), + ) + else: if checkpoints: checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) checkpoints.log(UploadFlow.NO_REPORTS_FOUND) log.info( - "Not scheduling task because there were no arguments were found on redis", - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - argument_list=argument_list, - ), + "Not scheduling task because there were no arguments found on redis", + extra=upload_context.log_extra(), ) return {"was_setup": was_setup, "was_updated": was_updated} def schedule_task( self, + db_session: Session, commit: Commit, commit_yaml: UserYaml, argument_list: list[dict], commit_report: CommitReport, upload_context: UploadContext, - db_session, - checkpoints=None, + checkpoints: CheckpointLogger | None, ): commit_yaml = commit_yaml.to_dict() - res = None - if ( - commit_report.report_type is None - or commit_report.report_type == ReportType.COVERAGE.value - ): - res = self._schedule_coverage_processing_task( + if upload_context.report_type == ReportType.COVERAGE: + assert ( + commit_report.report_type is None + or commit_report.report_type == ReportType.COVERAGE.value + ) + assert checkpoints + return self._schedule_coverage_processing_task( + db_session, commit, commit_yaml, argument_list, commit_report, upload_context, - db_session, - checkpoints=checkpoints, + checkpoints, ) - elif commit_report.report_type == ReportType.BUNDLE_ANALYSIS.value: - res = self._schedule_bundle_analysis_processing_task( + elif upload_context.report_type == ReportType.BUNDLE_ANALYSIS: + assert commit_report.report_type == ReportType.BUNDLE_ANALYSIS.value + return self._schedule_bundle_analysis_processing_task( commit, commit_yaml, argument_list, ) - elif commit_report.report_type == ReportType.TEST_RESULTS.value: - res = self._schedule_test_results_processing_task( + elif upload_context.report_type == ReportType.TEST_RESULTS: + assert commit_report.report_type == ReportType.TEST_RESULTS.value + assert checkpoints + return self._schedule_test_results_processing_task( commit, commit_yaml, argument_list, commit_report, checkpoints ) - if res: - return res - - log.info( - "Not scheduling task because there were no reports to be processed found", - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - argument_list=argument_list, - ), - ) - if checkpoints: - checkpoints.log(UploadFlow.NO_REPORTS_FOUND) - return None - def _schedule_coverage_processing_task( self, + db_session: Session, commit: Commit, commit_yaml: dict, argument_list: list[dict], commit_report: CommitReport, upload_context: UploadContext, - db_session, - checkpoints=None, + checkpoints: CheckpointLogger, ): - checkpoint_data = None - if checkpoints: - checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) - checkpoint_data = checkpoints.data + checkpoints.log(UploadFlow.INITIAL_PROCESSING_COMPLETE) processing_tasks = [ upload_processor_task.s( @@ -642,8 +608,6 @@ def _schedule_coverage_processing_task( ) for chunk in itertools.batched(argument_list, CHUNK_SIZE) ] - if not processing_tasks: - return None processing_tasks[0].args = ({},) # this is the first `previous_results` processing_tasks[-1].kwargs.update(is_final=True) @@ -655,7 +619,7 @@ def _schedule_coverage_processing_task( "commit_yaml": commit_yaml, "report_code": commit_report.code, "in_parallel": False, - _kwargs_key(UploadFlow): checkpoint_data, + _kwargs_key(UploadFlow): checkpoints.data, }, ) ) @@ -667,103 +631,87 @@ def _schedule_coverage_processing_task( ) if not do_parallel_processing: - res = serial_tasks.apply_async() - - else: - report_service = ReportService(commit_yaml) - sessions = report_service.build_sessions(commit=commit) - - # if session count expired due to TTL (which is unlikely for most cases), recalculate the - # session ids used and set it in redis. - redis_key = get_parallel_upload_processing_session_counter_redis_key( - repoid=commit.repository.repoid, commitid=commit.commitid - ) - if not upload_context.redis_connection.exists(redis_key): - upload_context.redis_connection.set( - redis_key, - max(sessions.keys()) + 1 if sessions.keys() else 0, - ) + return serial_tasks.apply_async() - # https://github.com/codecov/worker/commit/7d9c1984b8bc075c9fa002ee15cab3419684f2d6 - # try to scrap the redis counter idea to fully mimic how session ids are allocated in the - # serial flow. This change is technically less performant, and would not allow for concurrent - # chords to be running at the same time. For now this is just a temporary change, just for - # verifying correctness. - # - # # increment redis to claim session ids - # parallel_session_id = ( - # upload_context.redis_connection.incrby( - # name=redis_key, - # amount=num_sessions, - # ) - # - num_sessions - # ) - # upload_context.redis_connection.expire( - # name=redis_key, - # time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, - # ) - original_session_ids = list(sessions.keys()) - parallel_session_ids = get_parallel_session_ids( - sessions, - argument_list, - db_session, - report_service, - UserYaml(commit_yaml), - ) + report_service = ReportService(commit_yaml) + sessions = report_service.build_sessions(commit=commit) - log.info( - "Allocated the following session ids for parallel upload processing: " - + " ".join(str(id) for id in parallel_session_ids), - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - original_session_ids=original_session_ids, - ), - ) + # if session count expired due to TTL (which is unlikely for most cases), recalculate the + # session ids used and set it in redis. + redis_key = get_parallel_upload_processing_session_counter_redis_key( + repoid=commit.repository.repoid, commitid=commit.commitid + ) + if not upload_context.redis_connection.exists(redis_key): + upload_context.redis_connection.set( + redis_key, + max(sessions.keys()) + 1 if sessions.keys() else 0, + ) + + # https://github.com/codecov/worker/commit/7d9c1984b8bc075c9fa002ee15cab3419684f2d6 + # try to scrap the redis counter idea to fully mimic how session ids are allocated in the + # serial flow. This change is technically less performant, and would not allow for concurrent + # chords to be running at the same time. For now this is just a temporary change, just for + # verifying correctness. + # + # # increment redis to claim session ids + # parallel_session_id = ( + # upload_context.redis_connection.incrby( + # name=redis_key, + # amount=num_sessions, + # ) + # - num_sessions + # ) + # upload_context.redis_connection.expire( + # name=redis_key, + # time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, + # ) + original_session_ids = list(sessions.keys()) + parallel_session_ids = get_parallel_session_ids( + sessions, + argument_list, + db_session, + report_service, + UserYaml(commit_yaml), + ) - parallel_processing_tasks = [ - upload_processor_task.s( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml, - arguments_list=[arguments], - report_code=commit_report.code, - parallel_idx=parallel_session_id, - in_parallel=True, - is_final=False, - ) - for arguments, parallel_session_id in zip( - argument_list, parallel_session_ids - ) - ] + log.info( + "Allocated the following session ids for parallel upload processing: " + + " ".join(str(id) for id in parallel_session_ids), + extra=upload_context.log_extra( + original_session_ids=original_session_ids, + ), + ) - finish_parallel_sig = upload_finisher_task.signature( - kwargs={ - "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, - "report_code": commit_report.code, - "in_parallel": True, - _kwargs_key(UploadFlow): checkpoint_data, - }, + parallel_processing_tasks = [ + upload_processor_task.s( + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml=commit_yaml, + arguments_list=[arguments], + report_code=commit_report.code, + parallel_idx=parallel_session_id, + in_parallel=True, + is_final=False, ) + for arguments, parallel_session_id in zip( + argument_list, parallel_session_ids + ) + ] - parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) - parallel_shadow_experiment = serial_tasks | parallel_tasks - res = parallel_shadow_experiment.apply_async() - - log.info( - "Scheduling coverage processing tasks for %s different reports", - len(argument_list), - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - argument_list=argument_list, - number_arguments=len(argument_list), - scheduled_task_ids=res.as_tuple(), - ), + finish_parallel_sig = upload_finisher_task.signature( + kwargs={ + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml, + "report_code": commit_report.code, + "in_parallel": True, + _kwargs_key(UploadFlow): checkpoints.data, + }, ) - return res + + parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) + parallel_shadow_experiment = serial_tasks | parallel_tasks + return parallel_shadow_experiment.apply_async() def _schedule_bundle_analysis_processing_task( self, @@ -780,8 +728,7 @@ def _schedule_bundle_analysis_processing_task( ) for params in argument_list ] - if task_signatures: - task_signatures[0].args = ({},) # this is the first `previous_result` + task_signatures[0].args = ({},) # this is the first `previous_result` # it might make sense to eventually have a "finisher" task that # does whatever extra stuff + enqueues a notify @@ -793,18 +740,7 @@ def _schedule_bundle_analysis_processing_task( ) ) - res = chain(task_signatures).apply_async() - log.info( - "Scheduling bundle analysis processor tasks", - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - argument_list=argument_list, - number_arguments=len(argument_list), - scheduled_task_ids=res.as_tuple(), - ), - ) - return res + return chain(task_signatures).apply_async() def _schedule_test_results_processing_task( self, @@ -812,7 +748,7 @@ def _schedule_test_results_processing_task( commit_yaml: dict, argument_list: list[dict], commit_report: CommitReport, - checkpoints=None, + checkpoints: CheckpointLogger, ): processor_task_group = [ test_results_processor_task.s( @@ -825,46 +761,18 @@ def _schedule_test_results_processing_task( for chunk in itertools.batched(argument_list, CHUNK_SIZE) ] - if not processor_task_group: - log.info( - "Not scheduling test results processing tasks because there were no reports to be processed found", - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - argument_list=argument_list, - ), - ) - return None - - checkpoint_data = None - if checkpoints: - checkpoint_data = checkpoints.data - - res = chord( + return chord( processor_task_group, test_results_finisher_task.signature( kwargs={ "repoid": commit.repoid, "commitid": commit.commitid, "commit_yaml": commit_yaml, - _kwargs_key(TestResultsFlow): checkpoint_data, + _kwargs_key(TestResultsFlow): checkpoints.data, } ), ).apply_async() - log.info( - "Scheduling test results processing tasks for %s different reports", - len(argument_list), - extra=dict( - repoid=commit.repoid, - commit=commit.commitid, - argument_list=argument_list, - number_arguments=len(argument_list), - scheduled_task_ids=res.as_tuple(), - ), - ) - return res - def possibly_setup_webhooks(self, commit: Commit, repository_service): repository = commit.repository repo_data = repository_service.data