diff --git a/.env b/.env index 60dbdc5..1535e37 100644 --- a/.env +++ b/.env @@ -15,6 +15,7 @@ C_FORCE_ROOT=True CELERY_BROKER_URL=redis://redis:6379 CELERY_RESULT_BACKEND=redis://redis:6379 +CELERY_BROKER_HEARTBEAT=5 MONGO_URI=mongodb://mongodb:27017 MONGO_DBNAME=open-crawler diff --git a/app/api/utils.py b/app/api/utils.py index 4f7b9ae..47e61b8 100644 --- a/app/api/utils.py +++ b/app/api/utils.py @@ -1,11 +1,11 @@ from urllib.parse import urlparse -from celery import group, chain +from celery import group, chain, chord from app.repositories.crawls import crawls from app.celery_broker.tasks import ( METADATA_TASK_REGISTRY, - start_crawl_process, + start_crawl_process, finalize_crawl_process, ) from app.models.crawl import CrawlModel from app.models.website import WebsiteModel @@ -38,9 +38,12 @@ def start_crawl(crawl: CrawlModel) -> None: METADATA_TASK_REGISTRY.get(metadata).s() for metadata in crawl.enabled_metadata ) + # If a task in a chain fails, the remaining tasks in the chain will not be executed. + # To ensure that `finalize_crawl` is executed regardless of whether the previous tasks in the chain fail or succeed, + # We need to put it in the `link_error` callback in start_crawl_process and do a chord with the metadata tasks. chain( - start_crawl_process.s(crawl), - metadata_tasks, + start_crawl_process.s(crawl).on_error(finalize_crawl_process.s(crawl)), + chord(metadata_tasks, finalize_crawl_process.s(crawl)), ).apply_async(task_id=crawl.id) diff --git a/app/celery_broker/factory.py b/app/celery_broker/factory.py index 77fe520..e28e2d2 100644 --- a/app/celery_broker/factory.py +++ b/app/celery_broker/factory.py @@ -6,8 +6,8 @@ def create_celery_app() -> Celery: celery_app = Celery( "scanr", - broker=settings.CELERY_BROKER_URL, - backend=settings.CELERY_RESULT_BACKEND, + broker=settings.broker_url, + backend=settings.result_backend, broker_connection_retry_on_startup=True, include=["app.celery_broker.tasks"], ) diff --git a/app/celery_broker/metadata_utils.py b/app/celery_broker/metadata_utils.py index 04ce740..477873c 100644 --- a/app/celery_broker/metadata_utils.py +++ b/app/celery_broker/metadata_utils.py @@ -17,7 +17,7 @@ def handle_metadata_result( crawl_process: CrawlProcess, result: dict, metadata_type: MetadataType, -): +) -> CrawlProcess: if not result: task.update(status=ProcessStatus.ERROR) crawls.update_task( @@ -26,7 +26,7 @@ def handle_metadata_result( task=task, ) logger.error(f"{metadata_type} failed.") - return + return crawl_process store_metadata_result(crawl_process, result, metadata_type) if task.status == ProcessStatus.STARTED: task.update(status=ProcessStatus.SUCCESS) @@ -36,7 +36,7 @@ def handle_metadata_result( task=task, ) logger.debug(f"{metadata_type} ended!") - return result + return crawl_process def store_metadata_result( @@ -56,7 +56,7 @@ def metadata_task( metadata_type: MetadataType, calculator, method_name: str, -): +) -> CrawlProcess: calc_method = getattr(calculator, method_name) result = {} task.update(status=ProcessStatus.STARTED) @@ -87,6 +87,9 @@ def metadata_task( task_name=metadata_type, task=task, ) + crawls.update_status( + crawl_id=crawl_process.id, status=ProcessStatus.PARTIAL_ERROR + ) continue except Exception as e: logger.error( @@ -99,5 +102,8 @@ def metadata_task( task_name=metadata_type, task=task, ) + crawls.update_status( + crawl_id=crawl_process.id, status=ProcessStatus.PARTIAL_ERROR + ) continue return handle_metadata_result(task, crawl_process, result, metadata_type) diff --git a/app/celery_broker/tasks.py b/app/celery_broker/tasks.py index da9b90e..ec55c71 100644 --- a/app/celery_broker/tasks.py +++ b/app/celery_broker/tasks.py @@ -3,10 +3,12 @@ import pathlib import shutil from multiprocessing import Process, Manager +from typing import Optional # Local imports from app.repositories.crawls import crawls from app.repositories.files import files +from app.repositories.websites import websites from app.celery_broker.crawler_utils import start_crawler_process, set_html_crawl_status from app.celery_broker.main import celery_app from app.celery_broker.metadata_utils import metadata_task @@ -52,6 +54,9 @@ def start_crawl_process(self, crawl: CrawlModel) -> CrawlProcess: except Exception as e: logger.error(f"Error while crawling html files: {e}") set_html_crawl_status(crawl, self.request.id, ProcessStatus.ERROR) + crawls.update_status( + crawl_id=crawl.id, status=ProcessStatus.ERROR + ) self.update_state(state='FAILURE') return crawl_process try: @@ -61,6 +66,9 @@ def start_crawl_process(self, crawl: CrawlModel) -> CrawlProcess: logger.error(f"Error while uploading html files: {e}") # Html crawl will be considered failed if we can't upload the html files set_html_crawl_status(crawl, self.request.id, ProcessStatus.ERROR) + crawls.update_status( + crawl_id=crawl.id, status=ProcessStatus.ERROR + ) self.update_state(state='FAILURE') return crawl_process @@ -114,6 +122,29 @@ def get_carbon_footprint(self, crawl_process: CrawlProcess): ) +@celery_app.task(bind=True, name="finalize_crawl") +def finalize_crawl_process(self, crawl_process: Optional[CrawlProcess], crawl: CrawlModel): + logger.info( + f"Crawl process ({crawl.id}) for website {crawl.config.url} ended" + ) + + # Retrieve the current status of the crawl + current_crawl = crawls.get(crawl_id=crawl.id) + + if current_crawl.status == ProcessStatus.STARTED: + crawls.update_status( + crawl_id=crawl.id, status=ProcessStatus.SUCCESS + ) + + websites.store_last_crawl( + website_id=crawl.website_id, + crawl=crawls.get(crawl_id=crawl.id).model_dump(), + ) + + # This task will always succeed, since it retrieves the last crawl + self.update_state(state='SUCCESS') + + METADATA_TASK_REGISTRY = { MetadataType.LIGHTHOUSE: get_lighthouse, MetadataType.TECHNOLOGIES: get_technologies, diff --git a/app/config.py b/app/config.py index f65bc56..601eca6 100644 --- a/app/config.py +++ b/app/config.py @@ -36,14 +36,16 @@ class BaseConfig: MONGO_CRAWLS_COLLECTION = os.getenv("MONGO_CRAWLS_COLLECTION", default="crawls") # Celery - CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", default="redis://redis:6379") - CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", default="redis://redis:6379") + broker_url = os.getenv("CELERY_BROKER_URL", default="redis://redis:6379") + result_backend = os.getenv("CELERY_RESULT_BACKEND", default="redis://redis:6379") + broker_heartbeat = os.getenv("CELERY_BROKER_HEARTBEAT", default=2) CRAWL_QUEUE_NAME = "crawl_queue" LIGHTHOUSE_QUEUE_NAME = "lighthouse_queue" TECHNOLOGIES_QUEUE_NAME = "technologies_queue" RESPONSIVENESS_QUEUE_NAME = "responsiveness_queue" CARBON_QUEUE_NAME = "carbon_footprint_queue" + FINALIZE_CRAWL_QUEUE_NAME = "finalize_crawl_queue" # The following two lines make celery execute tasks locally # task_always_eager = True @@ -78,6 +80,11 @@ class BaseConfig: Exchange(CARBON_QUEUE_NAME), routing_key=CARBON_QUEUE_NAME, ), + Queue( + FINALIZE_CRAWL_QUEUE_NAME, + Exchange(FINALIZE_CRAWL_QUEUE_NAME), + routing_key=FINALIZE_CRAWL_QUEUE_NAME, + ), ) task_routes = { @@ -97,7 +104,11 @@ class BaseConfig: "get_carbon_footprint": { "queue": CARBON_QUEUE_NAME, "routing_key": CARBON_QUEUE_NAME, - } + }, + "finalize_crawl": { + "queue": FINALIZE_CRAWL_QUEUE_NAME, + "routing_key": FINALIZE_CRAWL_QUEUE_NAME, + }, } def get(self, attribute_name: str): diff --git a/app/models/website.py b/app/models/website.py index de052b1..4a01895 100644 --- a/app/models/website.py +++ b/app/models/website.py @@ -27,7 +27,68 @@ class WebsiteModel(BaseModel): tags: list[str] crawl_every: int next_crawl_at: Optional[datetime] = None - last_crawl: Optional[dict[str, Any]] = None + last_crawl: Optional[dict[str, Any]] = Field(None, examples=[{ + "id": "string", + "website_id": "string", + "config": { + "url": "website_url", + "parameters": { + "depth": 2, + "limit": 2, + "use_playwright": False + }, + "metadata_config": { + "lighthouse": { + "enabled": True, + "depth": 0 + }, + "technologies_and_trackers": { + "enabled": True, + "depth": 0 + }, + "responsiveness": { + "enabled": False, + "depth": 0 + }, + "carbon_footprint": { + "enabled": True, + "depth": 0 + } + }, + "headers": {}, + "tags": [] + }, + "created_at": "2023-12-01T07:53:38.330000", + "started_at": "2023-12-01T07:53:38.493000", + "finished_at": "2023-12-01T07:54:01.324000", + "status": "success", + "html_crawl": { + "task_id": "task_id_html", + "started_at": "2023-12-01T07:53:38.512000", + "finished_at": "2023-12-01T07:53:40.829000", + "status": "success" + }, + "lighthouse": { + "task_id": "task_id_lighthouse", + "started_at": "2023-12-01T07:53:40.848000", + "finished_at": "2023-12-01T07:54:01.295000", + "status": "success", + "score": 98 + }, + "technologies_and_trackers": { + "task_id": "task_id_technologies_and_trackers", + "started_at": "2023-12-01T07:53:40.850000", + "finished_at": "2023-12-01T07:53:50.030000", + "status": "success" + }, + "responsiveness": None, + "carbon_footprint": { + "task_id": "task_id_carbon_footprint", + "started_at": "2023-12-01T07:53:40.853000", + "finished_at": "2023-12-01T07:53:41.044000", + "status": "success" + } + }]) def to_config(self) -> CrawlConfig: return CrawlConfig( diff --git a/app/repositories/websites.py b/app/repositories/websites.py index 02a93bc..238e2db 100644 --- a/app/repositories/websites.py +++ b/app/repositories/websites.py @@ -72,7 +72,7 @@ def store_last_crawl(self, website_id: str, crawl: dict[str, Any]): result: UpdateResult = self.collection.update_one( filter={"id": website_id}, update={"$set": {"last_crawl": crawl}} ) - assert result.modified_count == 1 + assert result.modified_count <= 1 def refresh_next_crawl(self, website_id: str): website = self.get(website_id=website_id) diff --git a/docker-compose.yml b/docker-compose.yml index 010c518..1d5cbc5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -31,7 +31,7 @@ services: crawl_worker: &worker build: . env_file: .env - command: watchfiles --filter python 'celery -A celery_broker.main.celery_app worker -l info -P solo -n crawl_worker -Q crawl_queue' + command: watchfiles --filter python 'celery -A celery_broker.main.celery_app worker -l info -P solo -n crawl_worker -Q crawl_queue,finalize_crawl_queue' volumes: - ./app:/open-crawler/app - local_files:${LOCAL_FILES_PATH} @@ -75,8 +75,6 @@ services: replicas: 1 - - flower: container_name: flower image: mher/flower