diff --git a/backend/btrixcloud/background_jobs.py b/backend/btrixcloud/background_jobs.py index de3601fd94..bee8621ba0 100644 --- a/backend/btrixcloud/background_jobs.py +++ b/backend/btrixcloud/background_jobs.py @@ -1,6 +1,7 @@ """k8s background jobs""" import asyncio +import os from datetime import datetime from typing import Optional, Tuple, Union, List, Dict, TYPE_CHECKING, cast from uuid import UUID @@ -19,6 +20,7 @@ BgJobType, CreateReplicaJob, DeleteReplicaJob, + DeleteOrgJob, PaginatedBackgroundJobResponse, AnyJob, StorageRef, @@ -273,6 +275,51 @@ async def create_delete_replica_job( ) return None + async def create_delete_org_job( + self, + org: Organization, + existing_job_id: Optional[str] = None, + ) -> Optional[str]: + """Create background job to delete org and its data""" + + try: + job_id = await self.crawl_manager.run_delete_org_job( + oid=str(org.id), + backend_image=os.environ.get("BACKEND_IMAGE", ""), + pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""), + existing_job_id=existing_job_id, + ) + if existing_job_id: + delete_org_job = await self.get_background_job(existing_job_id, org.id) + previous_attempt = { + "started": delete_org_job.started, + "finished": delete_org_job.finished, + } + if delete_org_job.previousAttempts: + delete_org_job.previousAttempts.append(previous_attempt) + else: + delete_org_job.previousAttempts = [previous_attempt] + delete_org_job.started = dt_now() + delete_org_job.finished = None + delete_org_job.success = None + else: + delete_org_job = DeleteOrgJob( + id=job_id, + oid=org.id, + started=dt_now(), + ) + + await self.jobs.find_one_and_update( + {"_id": job_id}, {"$set": delete_org_job.to_dict()}, upsert=True + ) + + return job_id + # pylint: disable=broad-exception-caught + except Exception as exc: + # pylint: disable=raise-missing-from + print(f"warning: delete org job could not be started: {exc}") + return None + async def job_finished( self, job_id: str, @@ -316,10 +363,13 @@ async def job_finished( ) async def get_background_job( - self, job_id: str, oid: UUID - ) -> Union[CreateReplicaJob, DeleteReplicaJob]: + self, job_id: str, oid: Optional[UUID] = None + ) -> Union[CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob]: """Get background job""" - query: dict[str, object] = {"_id": job_id, "oid": oid} + query: dict[str, object] = {"_id": job_id} + if oid: + query["oid"] = oid + res = await self.jobs.find_one(query) if not res: raise HTTPException(status_code=404, detail="job_not_found") @@ -331,9 +381,10 @@ def _get_job_by_type_from_data(self, data: dict[str, object]): if data["type"] == BgJobType.CREATE_REPLICA: return CreateReplicaJob.from_dict(data) - return DeleteReplicaJob.from_dict(data) + if data["type"] == BgJobType.DELETE_REPLICA: + return DeleteReplicaJob.from_dict(data) - # return BackgroundJob.from_dict(data) + return DeleteOrgJob.from_dict(data) async def list_background_jobs( self, @@ -432,9 +483,8 @@ async def retry_background_job( if job.success: raise HTTPException(status_code=400, detail="job_already_succeeded") - file = await self.get_replica_job_file(job, org) - if job.type == BgJobType.CREATE_REPLICA: + file = await self.get_replica_job_file(job, org) primary_storage = self.storage_ops.get_org_storage_by_ref(org, file.storage) primary_endpoint, bucket_suffix = self.strip_bucket( primary_storage.endpoint_url @@ -452,6 +502,7 @@ async def retry_background_job( ) if job.type == BgJobType.DELETE_REPLICA: + file = await self.get_replica_job_file(job, org) await self.create_delete_replica_job( org, file, @@ -461,6 +512,12 @@ async def retry_background_job( existing_job_id=job_id, ) + if job.type == BgJobType.DELETE_ORG: + await self.create_delete_org_job( + org, + existing_job_id=job_id, + ) + return {"success": True} async def retry_failed_background_jobs( @@ -523,6 +580,14 @@ async def get_background_job( """Retrieve information for background job""" return await ops.get_background_job(job_id, org.id) + @app.get("/orgs/all/jobs/{job_id}", response_model=SuccessResponse, tags=["jobs"]) + async def get_background_job_all_orgs(job_id: str, user: User = Depends(user_dep)): + """Get background job from any org""" + if not user.is_superuser: + raise HTTPException(status_code=403, detail="Not Allowed") + + return await ops.get_background_job(job_id) + @router.post("/{job_id}/retry", response_model=SuccessResponse) async def retry_background_job( job_id: str, diff --git a/backend/btrixcloud/crawlmanager.py b/backend/btrixcloud/crawlmanager.py index 37d392ffad..a3d2354535 100644 --- a/backend/btrixcloud/crawlmanager.py +++ b/backend/btrixcloud/crawlmanager.py @@ -17,6 +17,8 @@ # ============================================================================ DEFAULT_PROXY_ID: str = os.environ.get("DEFAULT_PROXY_ID", "") +DEFAULT_NAMESPACE: str = os.environ.get("DEFAULT_NAMESPACE", "default") + # ============================================================================ class CrawlManager(K8sAPI): @@ -110,6 +112,34 @@ async def run_replica_job( return job_id + async def run_delete_org_job( + self, + oid: str, + backend_image: str, + pull_policy: str, + existing_job_id: Optional[str] = None, + ): + """run job to delete org and all of its data""" + + if existing_job_id: + job_id = existing_job_id + else: + job_id = f"delete-org-{oid}-{secrets.token_hex(5)}" + + params = { + "id": job_id, + "oid": oid, + "job_type": BgJobType.DELETE_ORG.value, + "backend_image": backend_image, + "pull_policy": pull_policy, + } + + data = self.templates.env.get_template("background_job.yaml").render(params) + + await self.create_from_yaml(data, namespace=DEFAULT_NAMESPACE) + + return job_id + async def create_crawl_job( self, crawlconfig: CrawlConfig, diff --git a/backend/btrixcloud/main.py b/backend/btrixcloud/main.py index 0bc3e48982..212308a49d 100644 --- a/backend/btrixcloud/main.py +++ b/backend/btrixcloud/main.py @@ -244,7 +244,7 @@ def main() -> None: init_uploads_api(*base_crawl_init) - org_ops.set_ops(base_crawl_ops, profiles, coll_ops) + org_ops.set_ops(base_crawl_ops, profiles, coll_ops, background_job_ops) user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops) diff --git a/backend/btrixcloud/main_bg.py b/backend/btrixcloud/main_bg.py new file mode 100644 index 0000000000..8b56e6a8da --- /dev/null +++ b/backend/btrixcloud/main_bg.py @@ -0,0 +1,144 @@ +""" entrypoint module for background jobs """ + +import asyncio +import os +import sys +import traceback +from uuid import UUID + +from .crawlmanager import CrawlManager +from .db import init_db +from .emailsender import EmailSender + +# from .utils import register_exit_handler +from .models import BgJobType + +from .basecrawls import BaseCrawlOps +from .invites import InviteOps +from .users import init_user_manager +from .orgs import OrgOps +from .colls import CollectionOps +from .crawlconfigs import CrawlConfigOps +from .crawls import CrawlOps +from .profiles import ProfileOps +from .storages import StorageOps +from .webhooks import EventWebhookOps +from .background_jobs import BackgroundJobOps +from .pages import PageOps + +job_type = os.environ.get("BG_JOB_TYPE") +oid = os.environ.get("OID") + + +# ============================================================================ +# pylint: disable=too-many-function-args, duplicate-code, too-many-locals +async def main(): + """main init""" + email = EmailSender() + crawl_manager = None + + dbclient, mdb = init_db() + + invite_ops = InviteOps(mdb, email) + + user_manager = init_user_manager(mdb, email, invite_ops) + + org_ops = OrgOps(mdb, invite_ops, user_manager) + + event_webhook_ops = EventWebhookOps(mdb, org_ops) + + # pylint: disable=import-outside-toplevel + if not os.environ.get("KUBERNETES_SERVICE_HOST"): + print( + "Sorry, the Browsertrix Backend must be run inside a Kubernetes environment.\ + Kubernetes not detected (KUBERNETES_SERVICE_HOST is not set), Exiting" + ) + sys.exit(1) + + crawl_manager = CrawlManager() + + storage_ops = StorageOps(org_ops, crawl_manager) + + background_job_ops = BackgroundJobOps( + mdb, email, user_manager, org_ops, crawl_manager, storage_ops + ) + + profile_ops = ProfileOps( + mdb, org_ops, crawl_manager, storage_ops, background_job_ops + ) + + crawl_config_ops = CrawlConfigOps( + dbclient, + mdb, + user_manager, + org_ops, + crawl_manager, + profile_ops, + ) + + coll_ops = CollectionOps(mdb, crawl_manager, org_ops, event_webhook_ops) + + base_crawl_ops = BaseCrawlOps( + mdb, + user_manager, + org_ops, + crawl_config_ops, + coll_ops, + storage_ops, + event_webhook_ops, + background_job_ops, + ) + + crawl_ops = CrawlOps( + crawl_manager, + mdb, + user_manager, + org_ops, + crawl_config_ops, + coll_ops, + storage_ops, + event_webhook_ops, + background_job_ops, + ) + + page_ops = PageOps(mdb, crawl_ops, org_ops, storage_ops) + + base_crawl_ops.set_page_ops(page_ops) + crawl_ops.set_page_ops(page_ops) + + background_job_ops.set_ops(crawl_ops, profile_ops) + + org_ops.set_ops(base_crawl_ops, profile_ops, coll_ops, background_job_ops) + + user_manager.set_ops(org_ops, crawl_config_ops, base_crawl_ops) + + background_job_ops.set_ops(base_crawl_ops, profile_ops) + + crawl_config_ops.set_coll_ops(coll_ops) + + # Run job + if job_type == BgJobType.DELETE_ORG: + if not oid: + print("Org id missing, quitting") + return 1 + org = await org_ops.get_org_by_id(UUID(oid)) + if not org: + print("Org id invalid, quitting") + return 1 + + try: + await org_ops.delete_org_and_data(org, user_manager) + return 0 + # pylint: disable=broad-exception-caught + except Exception: + traceback.print_exc() + return 1 + + print(f"Provided job type {job_type} not currently supported") + return 1 + + +# # ============================================================================ +if __name__ == "__main__": + return_code = asyncio.run(main()) + sys.exit(return_code) diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index e5d2e45941..fad8a6d689 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -2013,6 +2013,7 @@ class BgJobType(str, Enum): CREATE_REPLICA = "create-replica" DELETE_REPLICA = "delete-replica" + DELETE_ORG = "delete-org" # ============================================================================ @@ -2051,10 +2052,19 @@ class DeleteReplicaJob(BackgroundJob): replica_storage: StorageRef +# ============================================================================ +class DeleteOrgJob(BackgroundJob): + """Model for tracking deletion of org data jobs""" + + type: Literal[BgJobType.DELETE_ORG] = BgJobType.DELETE_ORG + + # ============================================================================ # Union of all job types, for response model -AnyJob = RootModel[Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob]] +AnyJob = RootModel[ + Union[CreateReplicaJob, DeleteReplicaJob, BackgroundJob, DeleteOrgJob] +] # ============================================================================ @@ -2274,6 +2284,13 @@ class DeletedResponse(BaseModel): deleted: bool +# ============================================================================ +class DeletedResponseId(DeletedResponse): + """Response for delete API endpoints that return job id""" + + id: str + + # ============================================================================ class DeletedResponseQuota(DeletedResponse): """Response for delete API endpoints""" diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index 3889c42b94..ca3642dca0 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -67,7 +67,7 @@ PAUSED_PAYMENT_FAILED, REASON_PAUSED, ACTIVE, - DeletedResponse, + DeletedResponseId, UpdatedResponse, AddedResponse, AddedResponseId, @@ -94,8 +94,10 @@ from .colls import CollectionOps from .profiles import ProfileOps from .users import UserManager + from .background_jobs import BackgroundJobOps else: - InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = UserManager = object + InviteOps = BaseCrawlOps = ProfileOps = CollectionOps = object + BackgroundJobOps = UserManager = object DEFAULT_ORG = os.environ.get("DEFAULT_ORG", "My Organization") @@ -151,12 +153,14 @@ def set_ops( base_crawl_ops: BaseCrawlOps, profile_ops: ProfileOps, coll_ops: CollectionOps, + background_job_ops: BackgroundJobOps, ) -> None: """Set base crawl ops""" # pylint: disable=attribute-defined-outside-init self.base_crawl_ops = base_crawl_ops self.profile_ops = profile_ops self.coll_ops = coll_ops + self.background_job_ops = background_job_ops def set_default_primary_storage(self, storage: StorageRef): """set default primary storage""" @@ -1451,15 +1455,16 @@ async def get_org( org_out.execMinutesQuotaReached = ops.exec_mins_quota_reached(org) return org_out - @router.delete("", tags=["organizations"], response_model=DeletedResponse) + @router.delete("", tags=["organizations"], response_model=DeletedResponseId) async def delete_org( org: Organization = Depends(org_dep), user: User = Depends(user_dep) ): if not user.is_superuser: raise HTTPException(status_code=403, detail="Not Allowed") - await ops.delete_org_and_data(org, user_manager) - return {"deleted": True} + job_id = await ops.background_job_ops.create_delete_org_job(org) + + return {"deleted": True, "id": job_id} @router.post("/rename", tags=["organizations"], response_model=UpdatedResponse) async def rename_org( diff --git a/backend/test/test_z_delete_org.py b/backend/test/test_z_delete_org.py index 2de04be119..3b3b5bf659 100644 --- a/backend/test/test_z_delete_org.py +++ b/backend/test/test_z_delete_org.py @@ -54,9 +54,41 @@ def test_delete_org_superadmin(admin_auth_headers, default_org_id): f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers ) assert r.status_code == 200 - assert r.json()["deleted"] + data = r.json() + assert data["deleted"] + + job_id = data["id"] + + # Check that background job is launched and eventually succeeds + max_attempts = 18 + attempts = 1 + while True: + try: + r = requests.get( + f"{API_PREFIX}/orgs/all/jobs/{job_id}", headers=admin_auth_headers + ) + assert r.status_code == 200 + success = r.json()["success"] + + if success: + break + + if success is False: + assert False + + if attempts >= max_attempts: + assert False + + time.sleep(10) + except: + pass + + attempts += 1 + + # Ensure org and items got deleted + r = requests.get(f"{API_PREFIX}/orgs/{default_org_id}", headers=admin_auth_headers) + assert r.status_code == 404 - # Ensure items got deleted for item_id in item_ids: r = requests.get( f"{API_PREFIX}/orgs/all/all-crawls/{item_id}/replay.json", diff --git a/chart/app-templates/background_job.yaml b/chart/app-templates/background_job.yaml new file mode 100644 index 0000000000..132d3bf8fe --- /dev/null +++ b/chart/app-templates/background_job.yaml @@ -0,0 +1,59 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: "{{ id }}" + labels: + role: "background-job" + job_type: {{ job_type }} + btrix.org: {{ oid }} + +spec: + ttlSecondsAfterFinished: 0 + backoffLimit: 3 + template: + spec: + restartPolicy: Never + priorityClassName: bg-job + podFailurePolicy: + rules: + - action: FailJob + onExitCodes: + containerName: btrixbgjob + operator: NotIn + values: [0] + + volumes: + - name: ops-configs + secret: + secretName: ops-configs + + containers: + - name: btrixbgjob + image: {{ backend_image }} + imagePullPolicy: {{ pull_policy }} + env: + - name: BG_JOB_TYPE + value: {{ job_type }} + + - name: OID + value: {{ oid }} + + envFrom: + - configMapRef: + name: backend-env-config + - secretRef: + name: mongo-auth + + volumeMounts: + - name: ops-configs + mountPath: /ops-configs/ + + command: ["python3", "-m", "btrixcloud.main_bg"] + + resources: + limits: + memory: "200Mi" + + requests: + memory: "200Mi" + cpu: "50m" diff --git a/chart/templates/configmap.yaml b/chart/templates/configmap.yaml index f876ae745c..a027e35e46 100644 --- a/chart/templates/configmap.yaml +++ b/chart/templates/configmap.yaml @@ -77,6 +77,11 @@ data: LOG_SENT_EMAILS: "{{ .Values.email.log_sent_emails }}" + BACKEND_IMAGE: "{{ .Values.backend_image }}" + + BACKEND_IMAGE_PULL_POLICY: "{{ .Values.backend_pull_policy }}" + + --- apiVersion: v1 kind: ConfigMap diff --git a/chart/templates/role.yaml b/chart/templates/role.yaml index 16f860734b..5e31ef1da5 100644 --- a/chart/templates/role.yaml +++ b/chart/templates/role.yaml @@ -21,6 +21,17 @@ rules: resources: ["pods"] verbs: ["list"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + namespace: {{ .Release.Namespace }} + name: bg-job +rules: + - apiGroups: ["batch"] + resources: ["jobs"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"] + --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 @@ -40,3 +51,23 @@ roleRef: kind: Role name: crawler-run apiGroup: rbac.authorization.k8s.io + +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: bg-job-role + namespace: {{ .Release.Namespace }} +subjects: +- kind: ServiceAccount + name: default + namespace: {{ .Release.Namespace }} + +- kind: User + name: system:anonymous + namespace: {{ .Release.Namespace }} + +roleRef: + kind: Role + name: bg-job + apiGroup: rbac.authorization.k8s.io