From 32412d1559cdd7e592cc08f3a00b1bfe374f21b5 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Fri, 10 May 2024 10:59:32 -0500 Subject: [PATCH] Add full collection for large repos Signed-off-by: Andrew Brain --- augur/tasks/github/messages/tasks.py | 76 ++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 11 deletions(-) diff --git a/augur/tasks/github/messages/tasks.py b/augur/tasks/github/messages/tasks.py index 54a4c41e0c..f3a30a54f6 100644 --- a/augur/tasks/github/messages/tasks.py +++ b/augur/tasks/github/messages/tasks.py @@ -8,13 +8,12 @@ from augur.tasks.github.util.github_task_session import GithubTaskManifest from augur.tasks.util.worker_util import remove_duplicate_dicts from augur.tasks.github.util.util import get_owner_repo -from augur.application.db.models import PullRequest, Message, Issue, PullRequestMessageRef, IssueMessageRef, Contributor, Repo - - +from augur.application.db.models import PullRequest, Message, Issue, PullRequestMessageRef, IssueMessageRef, Contributor, Repo, CollectionStatus +from augur.application.db import get_engine, get_session +from sqlalchemy.sql import text platform_id = 1 - @celery.task(base=AugurCoreRepoCollectionTask) def collect_github_messages(repo_git: str) -> None: @@ -29,18 +28,30 @@ def collect_github_messages(repo_git: str) -> None: owner, repo = get_owner_repo(repo_git) task_name = f"{owner}/{repo}: Message Task" - message_data = retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name) - - if message_data: + - process_messages(message_data, task_name, repo_id, logger, augur_db) + if is_repo_small(repo_id): + message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name) + + if message_data: + process_messages(message_data, task_name, repo_id, logger, augur_db) + + else: + logger.info(f"{owner}/{repo} has no messages") else: - logger.info(f"{owner}/{repo} has no messages") + process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db) + +def is_repo_small(repo_id): + with get_session() as session: -def retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name) -> None: + result = session.query(CollectionStatus).filter(CollectionStatus.repo_id == repo_id, CollectionStatus.issue_pr_sum <= 10).first() + + return result != None + +def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_name) -> None: owner, repo = get_owner_repo(repo_git) @@ -77,7 +88,50 @@ def retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, task_nam return all_data - + + +def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db) -> None: + + owner, repo = get_owner_repo(repo_git) + + # define logger for task + logger.info(f"Collecting github comments for {owner}/{repo}") + + engine = get_engine() + + with engine.connect() as connection: + + query = text(f""" + (select pr_comments_url from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc) + UNION + (select comments_url as comment_url from issues WHERE repo_id={repo_id} order by created_at desc); + """) + + result = connection.execute(query).fetchall() + comment_urls = [x[0] for x in result] + + all_data = [] + for index, comment_url in enumerate(comment_urls): + + logger.info(f"{task_name}: Github messages index {index+1} of {len(comment_urls)}") + + messages = GithubPaginator(comment_url, key_auth, logger) + for page_data, _ in messages.iter_pages(): + + if page_data is None or len(page_data) == 0: + break + + all_data += page_data + + logger.info(f"All data size: {len(all_data)}") + + if len(all_data) >= 20: + process_messages(all_data, task_name, repo_id, logger, augur_db) + all_data.clear() + + if len(all_data) > 0: + process_messages(all_data, task_name, repo_id, logger, augur_db) + def process_messages(messages, task_name, repo_id, logger, augur_db):