Skip to content

Commit

Permalink
webhook tweak: pass oid to crawl finished and upload finished webhooks (
Browse files Browse the repository at this point in the history
#1287)

Optimizes webhooks by passing oid directly to webhooks:
- avoids extra crawl lookup
- possible for crawl to be deleted before webhook is processed via
operator (resulting in crawl lookup to fail)
- add more typing to operator and webhooks
  • Loading branch information
ikreymer authored Oct 16, 2023
1 parent 6d6fa03 commit dc8d510
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 23 deletions.
61 changes: 48 additions & 13 deletions backend/btrixcloud/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,9 @@ async def sync_crawls(self, data: MCSyncData):
# pylint: disable=bare-except, broad-except
except:
# fail crawl if config somehow missing, shouldn't generally happen
await self.fail_crawl(crawl_id, uuid.UUID(cid), status, pods)
await self.fail_crawl(
crawl_id, uuid.UUID(cid), uuid.UUID(oid), status, pods
)

return self._empty_response(status)

Expand Down Expand Up @@ -661,7 +663,14 @@ async def can_start_new(self, crawl: CrawlSpec, data: MCSyncData, status):
)
return False

async def cancel_crawl(self, crawl_id, cid, oid, status, pods):
async def cancel_crawl(
self,
crawl_id: str,
cid: uuid.UUID,
oid: uuid.UUID,
status: CrawlStatus,
pods: dict,
) -> bool:
"""Mark crawl as canceled"""
if not await self.mark_finished(crawl_id, cid, oid, status, "canceled"):
return False
Expand Down Expand Up @@ -698,12 +707,20 @@ async def cancel_crawl(self, crawl_id, cid, oid, status, pods):

return status.canceled

async def fail_crawl(self, crawl_id, cid, status, pods, stats=None):
async def fail_crawl(
self,
crawl_id: str,
cid: uuid.UUID,
oid: uuid.UUID,
status: CrawlStatus,
pods: dict,
stats=None,
) -> bool:
"""Mark crawl as failed, log crawl state and print crawl logs, if possible"""
prev_state = status.state

if not await self.mark_finished(
crawl_id, cid, None, status, "failed", stats=stats
crawl_id, cid, oid, status, "failed", stats=stats
):
return False

Expand Down Expand Up @@ -1138,7 +1155,9 @@ async def update_crawl_state(self, redis, crawl, status, pods, done):
# check if one-page crawls actually succeeded
# if only one page found, and no files, assume failed
if status.pagesFound == 1 and not status.filesAdded:
await self.fail_crawl(crawl.id, crawl.cid, status, pods, stats)
await self.fail_crawl(
crawl.id, crawl.cid, crawl.oid, status, pods, stats
)
return status

completed = status.pagesDone and status.pagesDone >= status.pagesFound
Expand All @@ -1157,7 +1176,9 @@ async def update_crawl_state(self, redis, crawl, status, pods, done):
crawl.id, crawl.cid, crawl.oid, status, "canceled", crawl, stats
)
else:
await self.fail_crawl(crawl.id, crawl.cid, status, pods, stats)
await self.fail_crawl(
crawl.id, crawl.cid, crawl.oid, status, pods, stats
)

# check for other statuses
else:
Expand All @@ -1181,8 +1202,15 @@ async def update_crawl_state(self, redis, crawl, status, pods, done):

# pylint: disable=too-many-arguments
async def mark_finished(
self, crawl_id, cid, oid, status, state, crawl=None, stats=None
):
self,
crawl_id: str,
cid: uuid.UUID,
oid: uuid.UUID,
status: CrawlStatus,
state: str,
crawl=None,
stats=None,
) -> bool:
"""mark crawl as finished, set finished timestamp and final state"""

finished = dt_now()
Expand All @@ -1192,9 +1220,9 @@ async def mark_finished(
kwargs["stats"] = stats

if state in SUCCESSFUL_STATES:
allowed_from = RUNNING_STATES
allowed_from = list(RUNNING_STATES)
else:
allowed_from = RUNNING_AND_STARTING_STATES
allowed_from = list(RUNNING_AND_STARTING_STATES)

# if set_state returns false, already set to same status, return
if not await self.set_state(
Expand All @@ -1221,16 +1249,23 @@ async def mark_finished(

# pylint: disable=too-many-arguments
async def do_crawl_finished_tasks(
self, crawl_id, cid, oid, files_added_size, state
):
self,
crawl_id: str,
cid: uuid.UUID,
oid: uuid.UUID,
files_added_size: int,
state: str,
) -> None:
"""Run tasks after crawl completes in asyncio.task coroutine."""
await self.crawl_config_ops.stats_recompute_last(cid, files_added_size, 1)

if state in SUCCESSFUL_STATES and oid:
await self.org_ops.inc_org_bytes_stored(oid, files_added_size, "crawl")
await self.coll_ops.add_successful_crawl_to_collections(crawl_id, cid)

await self.event_webhook_ops.create_crawl_finished_notification(crawl_id, state)
await self.event_webhook_ops.create_crawl_finished_notification(
crawl_id, oid, state
)

# add crawl errors to db
await self.add_crawl_errors_to_db(crawl_id)
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/uploads.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ async def _create_upload(
)

asyncio.create_task(
self.event_webhook_ops.create_upload_finished_notification(crawl_id)
self.event_webhook_ops.create_upload_finished_notification(crawl_id, org.id)
)

quota_reached = await self.orgs.inc_org_bytes_stored(
Expand Down
20 changes: 11 additions & 9 deletions backend/btrixcloud/webhooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ async def _create_item_finished_notification(
crawl_ids=[crawl_id], coll_id=coll_id, org=org
)

async def create_crawl_finished_notification(self, crawl_id: str, state: str):
async def create_crawl_finished_notification(
self, crawl_id: str, oid: uuid.UUID, state: str
) -> None:
"""Create webhook notification for finished crawl."""
crawl_res = await self.crawls.find_one({"_id": crawl_id})
org = await self.org_ops.get_org_by_id(crawl_res["oid"])
org = await self.org_ops.get_org_by_id(oid)

if not org.webhookUrls or not org.webhookUrls.crawlFinished:
return
Expand All @@ -233,10 +234,11 @@ async def create_crawl_finished_notification(self, crawl_id: str, state: str):
),
)

async def create_upload_finished_notification(self, crawl_id: str):
async def create_upload_finished_notification(
self, crawl_id: str, oid: uuid.UUID
) -> None:
"""Create webhook notification for finished upload."""
crawl_res = await self.crawls.find_one({"_id": crawl_id})
org = await self.org_ops.get_org_by_id(crawl_res["oid"])
org = await self.org_ops.get_org_by_id(oid)

if not org.webhookUrls or not org.webhookUrls.uploadFinished:
return
Expand All @@ -252,7 +254,7 @@ async def create_upload_finished_notification(self, crawl_id: str):

async def create_crawl_started_notification(
self, crawl_id: str, oid: uuid.UUID, scheduled: bool = False
):
) -> None:
"""Create webhook notification for started crawl."""
org = await self.org_ops.get_org_by_id(oid)

Expand Down Expand Up @@ -318,7 +320,7 @@ async def create_added_to_collection_notification(
crawl_ids: List[str],
coll_id: uuid.UUID,
org: Organization,
):
) -> None:
"""Create webhook notification for item added to collection"""
if not org.webhookUrls or not org.webhookUrls.addedToCollection:
return
Expand All @@ -339,7 +341,7 @@ async def create_removed_from_collection_notification(
crawl_ids: List[str],
coll_id: uuid.UUID,
org: Organization,
):
) -> None:
"""Create webhook notification for item removed from collection"""
if not org.webhookUrls or not org.webhookUrls.removedFromCollection:
return
Expand Down

0 comments on commit dc8d510

Please sign in to comment.