Skip to content

Commit

Permalink
Move org deletion to background job with access to backend ops classes (
Browse files Browse the repository at this point in the history
#2098)

This PR introduces background jobs that have full access to the backend
ops classes and moves the delete org job to a background job.
  • Loading branch information
tw4l authored Oct 10, 2024
1 parent 84a74c4 commit 1b1819b
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 16 deletions.
79 changes: 72 additions & 7 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""k8s background jobs"""

import asyncio
import os
from datetime import datetime
from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast
from uuid import UUID
Expand All @@ -19,6 +20,7 @@
BgJobType,
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
Expand Down Expand Up @@ -273,6 +275,51 @@ async def create_delete_replica_job(
)
return None

async def create_delete_org_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
"""Create background job to delete org and its data"""

try:
job_id = await self.crawl_manager.run_delete_org_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
delete_org_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": delete_org_job.started,
"finished": delete_org_job.finished,
}
if delete_org_job.previousAttempts:
delete_org_job.previousAttempts.append(previous_attempt)
else:
delete_org_job.previousAttempts = [previous_attempt]
delete_org_job.started = dt_now()
delete_org_job.finished = None
delete_org_job.success = None
else:
delete_org_job = DeleteOrgJob(
id=job_id,
oid=org.id,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": delete_org_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: delete org job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
Expand Down Expand Up @@ -316,10 +363,13 @@ async def job_finished(
)

async def get_background_job(
self, job_id: str, oid: UUID
) -> Union[CreateReplicaJob, DeleteReplicaJob]:
self, job_id: str, oid: Optional[UUID] = None
) -> Union[CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id, "oid": oid}
query: dict[str, object] = {"_id": job_id}
if oid:
query["oid"] = oid

res = await self.jobs.find_one(query)
if not res:
raise HTTPException(status_code=404, detail="job_not_found")
Expand All @@ -331,9 +381,10 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.CREATE_REPLICA:
return CreateReplicaJob.from_dict(data)

return DeleteReplicaJob.from_dict(data)
if data["type"] == BgJobType.DELETE_REPLICA:
return DeleteReplicaJob.from_dict(data)

# return BackgroundJob.from_dict(data)
return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
self,
Expand Down Expand Up @@ -432,9 +483,8 @@ async def retry_background_job(
if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")

file = await self.get_replica_job_file(job, org)

if job.type == BgJobType.CREATE_REPLICA:
file = await self.get_replica_job_file(job, org)
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
primary_endpoint, bucket_suffix = self.strip_bucket(
primary_storage.endpoint_url
Expand All @@ -452,6 +502,7 @@ async def retry_background_job(
)

if job.type == BgJobType.DELETE_REPLICA:
file = await self.get_replica_job_file(job, org)
await self.create_delete_replica_job(
org,
file,
Expand All @@ -461,6 +512,12 @@ async def retry_background_job(
existing_job_id=job_id,
)

if job.type == BgJobType.DELETE_ORG:
await self.create_delete_org_job(
org,
existing_job_id=job_id,
)

return {"success": True}

async def retry_failed_background_jobs(
Expand Down Expand Up @@ -523,6 +580,14 @@ async def get_background_job(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

return await ops.get_background_job(job_id)

@router.post("/{job_id}/retry", response_model=SuccessResponse)
async def retry_background_job(
job_id: str,
Expand Down
30 changes: 30 additions & 0 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# ============================================================================
DEFAULT_PROXY_ID: str = os.environ.get("DEFAULT_PROXY_ID", "")

DEFAULT_NAMESPACE: str = os.environ.get("DEFAULT_NAMESPACE", "default")


# ============================================================================
class CrawlManager(K8sAPI):
Expand Down Expand Up @@ -110,6 +112,34 @@ async def run_replica_job(

return job_id

async def run_delete_org_job(
self,
oid: str,
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
):
"""run job to delete org and all of its data"""

if existing_job_id:
job_id = existing_job_id
else:
job_id = f"delete-org-{oid}-{secrets.token_hex(5)}"

params = {
"id": job_id,
"oid": oid,
"job_type": BgJobType.DELETE_ORG.value,
"backend_image": backend_image,
"pull_policy": pull_policy,
}

data = self.templates.env.get_template("background_job.yaml").render(params)

await self.create_from_yaml(data, namespace=DEFAULT_NAMESPACE)

return job_id

async def create_crawl_job(
self,
crawlconfig: CrawlConfig,
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def main() -> None:

init_uploads_api(*base_crawl_init)

org_ops.set_ops(base_crawl_ops, profiles, coll_ops)
org_ops.set_ops(base_crawl_ops, profiles, coll_ops, background_job_ops)

user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

Expand Down
144 changes: 144 additions & 0 deletions backend/btrixcloud/main_bg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
""" entrypoint module for background jobs """

import asyncio
import os
import sys
import traceback
from uuid import UUID

from .crawlmanager import CrawlManager
from .db import init_db
from .emailsender import EmailSender

# from .utils import register_exit_handler
from .models import BgJobType

from .basecrawls import BaseCrawlOps
from .invites import InviteOps
from .users import init_user_manager
from .orgs import OrgOps
from .colls import CollectionOps
from .crawlconfigs import CrawlConfigOps
from .crawls import CrawlOps
from .profiles import ProfileOps
from .storages import StorageOps
from .webhooks import EventWebhookOps
from .background_jobs import BackgroundJobOps
from .pages import PageOps

job_type = os.environ.get("BG_JOB_TYPE")
oid = os.environ.get("OID")


# ============================================================================
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals
async def main():
"""main init"""
email = EmailSender()
crawl_manager = None

dbclient, mdb = init_db()

invite_ops = InviteOps(mdb, email)

user_manager = init_user_manager(mdb, email, invite_ops)

org_ops = OrgOps(mdb, invite_ops, user_manager)

event_webhook_ops = EventWebhookOps(mdb, org_ops)

# pylint: disable=import-outside-toplevel
if not os.environ.get("KUBERNETES_SERVICE_HOST"):
print(
"Sorry, the Browsertrix Backend must be run inside a Kubernetes environment.\
Kubernetes not detected (KUBERNETES_SERVICE_HOST is not set), Exiting"
)
sys.exit(1)

crawl_manager = CrawlManager()

storage_ops = StorageOps(org_ops, crawl_manager)

background_job_ops = BackgroundJobOps(
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
)

profile_ops = ProfileOps(
mdb, org_ops, crawl_manager, storage_ops, background_job_ops
)

crawl_config_ops = CrawlConfigOps(
dbclient,
mdb,
user_manager,
org_ops,
crawl_manager,
profile_ops,
)

coll_ops = CollectionOps(mdb, crawl_manager, org_ops, event_webhook_ops)

base_crawl_ops = BaseCrawlOps(
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)

crawl_ops = CrawlOps(
crawl_manager,
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)

page_ops = PageOps(mdb, crawl_ops, org_ops, storage_ops)

base_crawl_ops.set_page_ops(page_ops)
crawl_ops.set_page_ops(page_ops)

background_job_ops.set_ops(crawl_ops, profile_ops)

org_ops.set_ops(base_crawl_ops, profile_ops, coll_ops, background_job_ops)

user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

background_job_ops.set_ops(base_crawl_ops, profile_ops)

crawl_config_ops.set_coll_ops(coll_ops)

# Run job
if job_type == BgJobType.DELETE_ORG:
if not oid:
print("Org id missing, quitting")
return 1
org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
return 1

try:
await org_ops.delete_org_and_data(org, user_manager)
return 0
# pylint: disable=broad-exception-caught
except Exception:
traceback.print_exc()
return 1

print(f"Provided job type {job_type} not currently supported")
return 1


# # ============================================================================
if __name__ == "__main__":
return_code = asyncio.run(main())
sys.exit(return_code)
19 changes: 18 additions & 1 deletion backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2013,6 +2013,7 @@ class BgJobType(str, Enum):

CREATE_REPLICA = "create-replica"
DELETE_REPLICA = "delete-replica"
DELETE_ORG = "delete-org"


# ============================================================================
Expand Down Expand Up @@ -2051,10 +2052,19 @@ class DeleteReplicaJob(BackgroundJob):
replica_storage: StorageRef


# ============================================================================
class DeleteOrgJob(BackgroundJob):
"""Model for tracking deletion of org data jobs"""

type: Literal[BgJobType.DELETE_ORG] = BgJobType.DELETE_ORG


# ============================================================================
# Union of all job types, for response model

AnyJob = RootModel[Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob]]
AnyJob = RootModel[
Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob, DeleteOrgJob]
]


# ============================================================================
Expand Down Expand Up @@ -2274,6 +2284,13 @@ class DeletedResponse(BaseModel):
deleted: bool


# ============================================================================
class DeletedResponseId(DeletedResponse):
"""Response for delete API endpoints that return job id"""

id: str


# ============================================================================
class DeletedResponseQuota(DeletedResponse):
"""Response for delete API endpoints"""
Expand Down
Loading

0 comments on commit 1b1819b

Please sign in to comment.