Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move org deletion to background job with access to backend ops classes #2098

Merged
merged 13 commits into from
Oct 10, 2024
79 changes: 72 additions & 7 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""k8s background jobs"""

import asyncio
import os
from datetime import datetime
from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast
from uuid import UUID
Expand All @@ -19,6 +20,7 @@
BgJobType,
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
Expand Down Expand Up @@ -273,6 +275,51 @@ async def create_delete_replica_job(
)
return None

async def create_delete_org_job(
self,
org: Organization,
existing_job_id: Optional[str] = None,
) -> Optional[str]:
"""Create background job to delete org and its data"""

try:
job_id = await self.crawl_manager.run_delete_org_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
delete_org_job = await self.get_background_job(existing_job_id, org.id)
previous_attempt = {
"started": delete_org_job.started,
"finished": delete_org_job.finished,
}
if delete_org_job.previousAttempts:
delete_org_job.previousAttempts.append(previous_attempt)
else:
delete_org_job.previousAttempts = [previous_attempt]
delete_org_job.started = dt_now()
delete_org_job.finished = None
delete_org_job.success = None
else:
delete_org_job = DeleteOrgJob(
id=job_id,
oid=org.id,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": delete_org_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: delete org job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
Expand Down Expand Up @@ -316,10 +363,13 @@ async def job_finished(
)

async def get_background_job(
self, job_id: str, oid: UUID
) -> Union[CreateReplicaJob, DeleteReplicaJob]:
self, job_id: str, oid: Optional[UUID] = None
) -> Union[CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id, "oid": oid}
query: dict[str, object] = {"_id": job_id}
if oid:
query["oid"] = oid

res = await self.jobs.find_one(query)
if not res:
raise HTTPException(status_code=404, detail="job_not_found")
Expand All @@ -331,9 +381,10 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.CREATE_REPLICA:
return CreateReplicaJob.from_dict(data)

return DeleteReplicaJob.from_dict(data)
if data["type"] == BgJobType.DELETE_REPLICA:
return DeleteReplicaJob.from_dict(data)

# return BackgroundJob.from_dict(data)
return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
self,
Expand Down Expand Up @@ -432,9 +483,8 @@ async def retry_background_job(
if job.success:
raise HTTPException(status_code=400, detail="job_already_succeeded")

file = await self.get_replica_job_file(job, org)

if job.type == BgJobType.CREATE_REPLICA:
file = await self.get_replica_job_file(job, org)
primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage)
primary_endpoint, bucket_suffix = self.strip_bucket(
primary_storage.endpoint_url
Expand All @@ -452,6 +502,7 @@ async def retry_background_job(
)

if job.type == BgJobType.DELETE_REPLICA:
file = await self.get_replica_job_file(job, org)
await self.create_delete_replica_job(
org,
file,
Expand All @@ -461,6 +512,12 @@ async def retry_background_job(
existing_job_id=job_id,
)

if job.type == BgJobType.DELETE_ORG:
await self.create_delete_org_job(
org,
existing_job_id=job_id,
)

return {"success": True}

async def retry_failed_background_jobs(
Expand Down Expand Up @@ -523,6 +580,14 @@ async def get_background_job(
"""Retrieve information for background job"""
return await ops.get_background_job(job_id, org.id)

@app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"])
async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)):
"""Get background job from any org"""
if not user.is_superuser:
raise HTTPException(status_code=403, detail="Not Allowed")

return await ops.get_background_job(job_id)

@router.post("/{job_id}/retry", response_model=SuccessResponse)
async def retry_background_job(
job_id: str,
Expand Down
30 changes: 30 additions & 0 deletions backend/btrixcloud/crawlmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
# ============================================================================
DEFAULT_PROXY_ID: str = os.environ.get("DEFAULT_PROXY_ID", "")

DEFAULT_NAMESPACE: str = os.environ.get("DEFAULT_NAMESPACE", "default")


# ============================================================================
class CrawlManager(K8sAPI):
Expand Down Expand Up @@ -110,6 +112,34 @@ async def run_replica_job(

return job_id

async def run_delete_org_job(
self,
oid: str,
backend_image: str,
pull_policy: str,
existing_job_id: Optional[str] = None,
):
"""run job to delete org and all of its data"""

if existing_job_id:
job_id = existing_job_id
else:
job_id = f"delete-org-{oid}-{secrets.token_hex(5)}"

params = {
"id": job_id,
"oid": oid,
"job_type": BgJobType.DELETE_ORG.value,
"backend_image": backend_image,
"pull_policy": pull_policy,
}

data = self.templates.env.get_template("background_job.yaml").render(params)

await self.create_from_yaml(data, namespace=DEFAULT_NAMESPACE)

return job_id

async def create_crawl_job(
self,
crawlconfig: CrawlConfig,
Expand Down
2 changes: 1 addition & 1 deletion backend/btrixcloud/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ def main() -> None:

init_uploads_api(*base_crawl_init)

org_ops.set_ops(base_crawl_ops, profiles, coll_ops)
org_ops.set_ops(base_crawl_ops, profiles, coll_ops, background_job_ops)

user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

Expand Down
144 changes: 144 additions & 0 deletions backend/btrixcloud/main_bg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
""" entrypoint module for background jobs """

import asyncio
import os
import sys
import traceback
from uuid import UUID

from .crawlmanager import CrawlManager
from .db import init_db
from .emailsender import EmailSender

# from .utils import register_exit_handler
from .models import BgJobType

from .basecrawls import BaseCrawlOps
from .invites import InviteOps
from .users import init_user_manager
from .orgs import OrgOps
from .colls import CollectionOps
from .crawlconfigs import CrawlConfigOps
from .crawls import CrawlOps
from .profiles import ProfileOps
from .storages import StorageOps
from .webhooks import EventWebhookOps
from .background_jobs import BackgroundJobOps
from .pages import PageOps

job_type = os.environ.get("BG_JOB_TYPE")
oid = os.environ.get("OID")


# ============================================================================
# pylint: disable=too-many-function-args, duplicate-code, too-many-locals
async def main():
"""main init"""
email = EmailSender()
crawl_manager = None

dbclient, mdb = init_db()

invite_ops = InviteOps(mdb, email)

user_manager = init_user_manager(mdb, email, invite_ops)

org_ops = OrgOps(mdb, invite_ops, user_manager)

event_webhook_ops = EventWebhookOps(mdb, org_ops)

# pylint: disable=import-outside-toplevel
if not os.environ.get("KUBERNETES_SERVICE_HOST"):
print(
"Sorry, the Browsertrix Backend must be run inside a Kubernetes environment.\
Kubernetes not detected (KUBERNETES_SERVICE_HOST is not set), Exiting"
)
sys.exit(1)

crawl_manager = CrawlManager()

storage_ops = StorageOps(org_ops, crawl_manager)

background_job_ops = BackgroundJobOps(
mdb, email, user_manager, org_ops, crawl_manager, storage_ops
)

profile_ops = ProfileOps(
mdb, org_ops, crawl_manager, storage_ops, background_job_ops
)

crawl_config_ops = CrawlConfigOps(
dbclient,
mdb,
user_manager,
org_ops,
crawl_manager,
profile_ops,
)

coll_ops = CollectionOps(mdb, crawl_manager, org_ops, event_webhook_ops)

base_crawl_ops = BaseCrawlOps(
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)

crawl_ops = CrawlOps(
crawl_manager,
mdb,
user_manager,
org_ops,
crawl_config_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
)

page_ops = PageOps(mdb, crawl_ops, org_ops, storage_ops)

base_crawl_ops.set_page_ops(page_ops)
crawl_ops.set_page_ops(page_ops)

background_job_ops.set_ops(crawl_ops, profile_ops)

org_ops.set_ops(base_crawl_ops, profile_ops, coll_ops, background_job_ops)

user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops)

background_job_ops.set_ops(base_crawl_ops, profile_ops)

crawl_config_ops.set_coll_ops(coll_ops)

# Run job
if job_type == BgJobType.DELETE_ORG:
if not oid:
print("Org id missing, quitting")
return 1
org = await org_ops.get_org_by_id(UUID(oid))
if not org:
print("Org id invalid, quitting")
return 1

try:
await org_ops.delete_org_and_data(org, user_manager)
return 0
# pylint: disable=broad-exception-caught
except Exception:
traceback.print_exc()
return 1

print(f"Provided job type {job_type} not currently supported")
return 1


# # ============================================================================
if __name__ == "__main__":
return_code = asyncio.run(main())
sys.exit(return_code)
19 changes: 18 additions & 1 deletion backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2013,6 +2013,7 @@ class BgJobType(str, Enum):

CREATE_REPLICA = "create-replica"
DELETE_REPLICA = "delete-replica"
DELETE_ORG = "delete-org"


# ============================================================================
Expand Down Expand Up @@ -2051,10 +2052,19 @@ class DeleteReplicaJob(BackgroundJob):
replica_storage: StorageRef


# ============================================================================
class DeleteOrgJob(BackgroundJob):
"""Model for tracking deletion of org data jobs"""

type: Literal[BgJobType.DELETE_ORG] = BgJobType.DELETE_ORG


# ============================================================================
# Union of all job types, for response model

AnyJob = RootModel[Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob]]
AnyJob = RootModel[
Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob, DeleteOrgJob]
]


# ============================================================================
Expand Down Expand Up @@ -2274,6 +2284,13 @@ class DeletedResponse(BaseModel):
deleted: bool


# ============================================================================
class DeletedResponseId(DeletedResponse):
"""Response for delete API endpoints that return job id"""

id: str


# ============================================================================
class DeletedResponseQuota(DeletedResponse):
"""Response for delete API endpoints"""
Expand Down
Loading
Loading