diff --git a/backend/btrixcloud/main_op.py b/backend/btrixcloud/main_op.py index 600a0ca893..ba6555dba2 100644 --- a/backend/btrixcloud/main_op.py +++ b/backend/btrixcloud/main_op.py @@ -112,5 +112,5 @@ def main(): async def startup(): """init on startup""" register_exit_handler() - oper = main() - await oper.async_init() + settings = main() + await settings.async_init() diff --git a/backend/btrixcloud/operator/__init__.py b/backend/btrixcloud/operator/__init__.py new file mode 100644 index 0000000000..dd5f4830da --- /dev/null +++ b/backend/btrixcloud/operator/__init__.py @@ -0,0 +1,28 @@ +""" operators module """ + +from .profiles import ProfileOperator +from .bgjobs import BgJobOperator +from .cronjobs import CronJobOperator +from .crawls import CrawlOperator +from .baseoperator import K8sOpAPI + +operator_classes = [ProfileOperator, BgJobOperator, CronJobOperator, CrawlOperator] + + +# ============================================================================ +def init_operator_api(app, *args): + """registers webhook handlers for metacontroller""" + + k8s = K8sOpAPI() + + operators = [] + for cls in operator_classes: + oper = cls(k8s, *args) + oper.init_routes(app) + operators.append(oper) + + @app.get("/healthz", include_in_schema=False) + async def healthz(): + return {} + + return k8s diff --git a/backend/btrixcloud/operator/baseoperator.py b/backend/btrixcloud/operator/baseoperator.py new file mode 100644 index 0000000000..1a2e89baac --- /dev/null +++ b/backend/btrixcloud/operator/baseoperator.py @@ -0,0 +1,131 @@ +""" Base Operator class for all operators """ + +import asyncio +from typing import TYPE_CHECKING +from kubernetes.utils import parse_quantity + +import yaml +from btrixcloud.k8sapi import K8sAPI + + +if TYPE_CHECKING: + from btrixcloud.crawlconfigs import CrawlConfigOps + from btrixcloud.crawls import CrawlOps + from btrixcloud.orgs import OrgOps + from btrixcloud.colls import CollectionOps + from btrixcloud.storages import StorageOps + from btrixcloud.webhooks import EventWebhookOps + from btrixcloud.users import UserManager + from btrixcloud.background_jobs import BackgroundJobOps + from btrixcloud.pages import PageOps + from redis.asyncio.client import Redis +else: + CrawlConfigOps = CrawlOps = OrgOps = CollectionOps = Redis = object + StorageOps = EventWebhookOps = UserManager = BackgroundJobOps = PageOps = object + + +# ============================================================================ +class K8sOpAPI(K8sAPI): + """Additional k8s api for operators""" + + def __init__(self): + super().__init__() + self.config_file = "/config/config.yaml" + with open(self.config_file, encoding="utf-8") as fh_config: + self.shared_params = yaml.safe_load(fh_config) + + self.has_pod_metrics = False + self.compute_crawler_resources() + + def compute_crawler_resources(self): + """compute memory / cpu resources for crawlers""" + p = self.shared_params + num = max(int(p["crawler_browser_instances"]) - 1, 0) + if not p.get("crawler_cpu"): + base = parse_quantity(p["crawler_cpu_base"]) + extra = parse_quantity(p["crawler_extra_cpu_per_browser"]) + + # cpu is a floating value of cpu cores + p["crawler_cpu"] = float(base + num * extra) + + print(f"cpu = {base} + {num} * {extra} = {p['crawler_cpu']}") + else: + print(f"cpu = {p['crawler_cpu']}") + + if not p.get("crawler_memory"): + base = parse_quantity(p["crawler_memory_base"]) + extra = parse_quantity(p["crawler_extra_memory_per_browser"]) + + # memory is always an int + p["crawler_memory"] = int(base + num * extra) + + print(f"memory = {base} + {num} * {extra} = {p['crawler_memory']}") + else: + print(f"memory = {p['crawler_memory']}") + + async def async_init(self): + """perform any async init here""" + self.has_pod_metrics = await self.is_pod_metrics_available() + print("Pod Metrics Available:", self.has_pod_metrics) + + +# pylint: disable=too-many-instance-attributes, too-many-arguments +# ============================================================================ +class BaseOperator: + """BaseOperator""" + + k8s: K8sOpAPI + crawl_config_ops: CrawlConfigOps + crawl_ops: CrawlOps + orgs_ops: OrgOps + coll_ops: CollectionOps + storage_ops: StorageOps + event_webhook_ops: EventWebhookOps + background_job_ops: BackgroundJobOps + user_ops: UserManager + page_ops: PageOps + + def __init__( + self, + k8s, + crawl_config_ops, + crawl_ops, + org_ops, + coll_ops, + storage_ops, + event_webhook_ops, + background_job_ops, + page_ops, + ): + self.k8s = k8s + self.crawl_config_ops = crawl_config_ops + self.crawl_ops = crawl_ops + self.org_ops = org_ops + self.coll_ops = coll_ops + self.storage_ops = storage_ops + self.background_job_ops = background_job_ops + self.event_webhook_ops = event_webhook_ops + self.page_ops = page_ops + + self.user_ops = crawl_config_ops.user_manager + + # to avoid background tasks being garbage collected + # see: https://stackoverflow.com/a/74059981 + self.bg_tasks = set() + + def init_routes(self, app): + """init routes for this operator""" + + def run_task(self, func): + """add bg tasks to set to avoid premature garbage collection""" + task = asyncio.create_task(func) + self.bg_tasks.add(task) + task.add_done_callback(self.bg_tasks.discard) + + def load_from_yaml(self, filename, params): + """load and parse k8s template from yaml file""" + return list( + yaml.safe_load_all( + self.k8s.templates.env.get_template(filename).render(params) + ) + ) diff --git a/backend/btrixcloud/operator/bgjobs.py b/backend/btrixcloud/operator/bgjobs.py new file mode 100644 index 0000000000..fad3deea41 --- /dev/null +++ b/backend/btrixcloud/operator/bgjobs.py @@ -0,0 +1,62 @@ +""" Operator handler for BackgroundJobs """ + +from uuid import UUID +import traceback + +from btrixcloud.utils import ( + from_k8s_date, + dt_now, +) + +from .models import MCDecoratorSyncData +from .baseoperator import BaseOperator + + +# ============================================================================ +class BgJobOperator(BaseOperator): + """BgJobOperator""" + + def init_routes(self, app): + """init routes for this operator""" + + # nop, but needed for metacontroller + @app.post("/op/backgroundjob/sync") + async def mc_sync_background_jobs(): + return {"attachments": []} + + @app.post("/op/backgroundjob/finalize") + async def mc_finalize_background_jobs(data: MCDecoratorSyncData): + return await self.finalize_background_job(data) + + async def finalize_background_job(self, data: MCDecoratorSyncData) -> dict: + """handle finished background job""" + + metadata = data.object["metadata"] + labels: dict[str, str] = metadata.get("labels", {}) + oid: str = labels.get("btrix.org") or "" + job_type: str = labels.get("job_type") or "" + job_id: str = metadata.get("name") + + status = data.object["status"] + success = status.get("succeeded") == 1 + completion_time = status.get("completionTime") + + finalized = True + + finished = from_k8s_date(completion_time) if completion_time else dt_now() + + try: + await self.background_job_ops.job_finished( + job_id, job_type, UUID(oid), success=success, finished=finished + ) + # print( + # f"{job_type} background job completed: success: {success}, {job_id}", + # flush=True, + # ) + + # pylint: disable=broad-except + except Exception: + print("Update Background Job Error", flush=True) + traceback.print_exc() + + return {"attachments": [], "finalized": finalized} diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator/crawls.py similarity index 71% rename from backend/btrixcloud/operator.py rename to backend/btrixcloud/operator/crawls.py index bbe7fc293b..bab54232bb 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator/crawls.py @@ -1,33 +1,20 @@ -""" btrixjob operator (working for metacontroller) """ +""" CrawlOperator """ -import asyncio import traceback import os from pprint import pprint -from typing import Optional, DefaultDict, TYPE_CHECKING - -from collections import defaultdict +from typing import Optional import json from uuid import UUID from fastapi import HTTPException -import yaml import humanize -from pydantic import BaseModel, Field - from kubernetes.utils import parse_quantity from redis import asyncio as exceptions -from .utils import ( - from_k8s_date, - to_k8s_date, - dt_now, -) -from .k8sapi import K8sAPI - -from .models import ( +from btrixcloud.models import ( NON_RUNNING_STATES, RUNNING_STATES, RUNNING_AND_STARTING_ONLY, @@ -39,27 +26,25 @@ StorageRef, ) -if TYPE_CHECKING: - from .crawlconfigs import CrawlConfigOps - from .crawls import CrawlOps - from .orgs import OrgOps - from .colls import CollectionOps - from .storages import StorageOps - from .webhooks import EventWebhookOps - from .users import UserManager - from .background_jobs import BackgroundJobOps - from .pages import PageOps - from redis.asyncio.client import Redis -else: - CrawlConfigOps = CrawlOps = OrgOps = CollectionOps = Redis = object - StorageOps = EventWebhookOps = UserManager = BackgroundJobOps = PageOps = object - -CMAP = "ConfigMap.v1" -PVC = "PersistentVolumeClaim.v1" -POD = "Pod.v1" - -BTRIX_API = "btrix.cloud/v1" -CJS = f"CrawlJob.{BTRIX_API}" +from btrixcloud.utils import ( + from_k8s_date, + to_k8s_date, + dt_now, +) + +from .baseoperator import BaseOperator, Redis +from .models import ( + CrawlSpec, + CrawlStatus, + MCBaseRequest, + MCSyncData, + POD, + CMAP, + PVC, + CJS, + BTRIX_API, +) + METRICS_API = "metrics.k8s.io/v1beta1" METRICS = f"PodMetrics.{METRICS_API}" @@ -75,228 +60,14 @@ EXEC_TIME_UPDATE_SECS = 60 +# pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements +# pylint: disable=invalid-name, too-many-lines, too-many-return-statements # ============================================================================ -class MCBaseRequest(BaseModel): - """base metacontroller model, used for customize hook""" - - parent: dict - controller: dict - - -# ============================================================================ -class MCSyncData(MCBaseRequest): - """sync / finalize metacontroller model""" - - children: dict - related: dict - finalizing: bool = False - - -# ============================================================================ -class MCDecoratorSyncData(BaseModel): - """sync for decoratorcontroller model""" - - object: dict - controller: dict - - attachments: dict - related: dict - finalizing: bool = False - - -# ============================================================================ -class CrawlSpec(BaseModel): - """spec from k8s CrawlJob object""" - - id: str - cid: UUID - oid: UUID - scale: int = 1 - storage: StorageRef - started: str - crawler_channel: str - stopping: bool = False - scheduled: bool = False - timeout: int = 0 - max_crawl_size: int = 0 - - -# ============================================================================ -class PodResourcePercentage(BaseModel): - """Resource usage percentage ratios""" - - memory: float = 0 - cpu: float = 0 - storage: float = 0 - - -# ============================================================================ -class PodResources(BaseModel): - """Pod Resources""" - - memory: int = 0 - cpu: float = 0 - storage: int = 0 +class CrawlOperator(BaseOperator): + """CrawlOperator Handler""" - def __init__(self, *a, **kw): - if "memory" in kw: - kw["memory"] = int(parse_quantity(kw["memory"])) - if "cpu" in kw: - kw["cpu"] = float(parse_quantity(kw["cpu"])) - if "storage" in kw: - kw["storage"] = int(parse_quantity(kw["storage"])) - super().__init__(*a, **kw) - - -# ============================================================================ -class PodInfo(BaseModel): - """Aggregate pod status info held in CrawlJob""" - - exitTime: Optional[str] = None - exitCode: Optional[int] = None - isNewExit: Optional[bool] = Field(default=None, exclude=True) - reason: Optional[str] = None - - allocated: PodResources = PodResources() - used: PodResources = PodResources() - - newCpu: Optional[int] = None - newMemory: Optional[int] = None - - def dict(self, *a, **kw): - res = super().dict(*a, **kw) - percent = { - "memory": self.get_percent_memory(), - "cpu": self.get_percent_cpu(), - "storage": self.get_percent_storage(), - } - res["percent"] = percent - return res - - def get_percent_memory(self) -> float: - """compute percent memory used""" - return ( - float(self.used.memory) / float(self.allocated.memory) - if self.allocated.memory - else 0 - ) - - def get_percent_cpu(self) -> float: - """compute percent cpu used""" - return ( - float(self.used.cpu) / float(self.allocated.cpu) - if self.allocated.cpu - else 0 - ) - - def get_percent_storage(self) -> float: - """compute percent storage used""" - return ( - float(self.used.storage) / float(self.allocated.storage) - if self.allocated.storage - else 0 - ) - - def should_restart_pod(self): - """return true if pod should be restarted""" - if self.newMemory and self.newMemory != self.allocated.memory: - return True - - if self.newCpu and self.newCpu != self.allocated.cpu: - return True - - return False - - -# ============================================================================ -class CrawlStatus(BaseModel): - """status from k8s CrawlJob object""" - - state: str = "starting" - pagesFound: int = 0 - pagesDone: int = 0 - size: int = 0 - # human readable size string - sizeHuman: str = "" - scale: int = 1 - filesAdded: int = 0 - filesAddedSize: int = 0 - finished: Optional[str] = None - stopping: bool = False - stopReason: Optional[str] = None - initRedis: bool = False - crawlerImage: Optional[str] = None - lastActiveTime: str = "" - podStatus: Optional[DefaultDict[str, PodInfo]] = defaultdict( - lambda: PodInfo() # pylint: disable=unnecessary-lambda - ) - # placeholder for pydantic 2.0 -- will require this version - # podStatus: Optional[ - # DefaultDict[str, Annotated[PodInfo, Field(default_factory=PodInfo)]] - # ] - restartTime: Optional[str] - canceled: bool = False - - # updated on pod exits and at regular interval - # Crawl Execution Time -- time all crawler pods have been running - # used to track resource usage and enforce execution minutes limit - crawlExecTime: int = 0 - - # Elapsed Exec Time -- time crawl has been running in at least one pod - # used for crawl timeouts - elapsedCrawlTime: int = 0 - - # last exec time update - lastUpdatedTime: str = "" - - # any pods exited - anyCrawlPodNewExit: Optional[bool] = Field(default=False, exclude=True) - - # don't include in status, use by metacontroller - resync_after: Optional[int] = Field(default=None, exclude=True) - - -# ============================================================================ -# pylint: disable=too-many-statements, too-many-public-methods, too-many-branches, too-many-nested-blocks -# pylint: disable=too-many-instance-attributes, too-many-locals, too-many-lines, too-many-arguments -class BtrixOperator(K8sAPI): - """BtrixOperator Handler""" - - crawl_config_ops: CrawlConfigOps - crawl_ops: CrawlOps - orgs_ops: OrgOps - coll_ops: CollectionOps - storage_ops: StorageOps - event_webhook_ops: EventWebhookOps - background_job_ops: BackgroundJobOps - user_ops: UserManager - page_ops: PageOps - - def __init__( - self, - crawl_config_ops, - crawl_ops, - org_ops, - coll_ops, - storage_ops, - event_webhook_ops, - background_job_ops, - page_ops, - ): - super().__init__() - - self.crawl_config_ops = crawl_config_ops - self.crawl_ops = crawl_ops - self.org_ops = org_ops - self.coll_ops = coll_ops - self.storage_ops = storage_ops - self.background_job_ops = background_job_ops - self.event_webhook_ops = event_webhook_ops - self.page_ops = page_ops - - self.user_ops = crawl_config_ops.user_manager - - self.config_file = "/config/config.yaml" + def __init__(self, *args): + super().__init__(*args) self.done_key = "crawls-done" self.pages_key = "pages" @@ -306,83 +77,22 @@ def __init__( self.log_failed_crawl_lines = int(os.environ.get("LOG_FAILED_CRAWL_LINES") or 0) - with open(self.config_file, encoding="utf-8") as fh_config: - self.shared_params = yaml.safe_load(fh_config) - - self._has_pod_metrics = False - self.compute_crawler_resources() - - # to avoid background tasks being garbage collected - # see: https://stackoverflow.com/a/74059981 - self.bg_tasks = set() - - def compute_crawler_resources(self): - """compute memory / cpu resources for crawlers""" - # pylint: disable=invalid-name - p = self.shared_params - num = max(int(p["crawler_browser_instances"]) - 1, 0) - if not p.get("crawler_cpu"): - base = parse_quantity(p["crawler_cpu_base"]) - extra = parse_quantity(p["crawler_extra_cpu_per_browser"]) - - # cpu is a floating value of cpu cores - p["crawler_cpu"] = float(base + num * extra) - - print(f"cpu = {base} + {num} * {extra} = {p['crawler_cpu']}") - else: - print(f"cpu = {p['crawler_cpu']}") - - if not p.get("crawler_memory"): - base = parse_quantity(p["crawler_memory_base"]) - extra = parse_quantity(p["crawler_extra_memory_per_browser"]) - - # memory is always an int - p["crawler_memory"] = int(base + num * extra) - - print(f"memory = {base} + {num} * {extra} = {p['crawler_memory']}") - else: - print(f"memory = {p['crawler_memory']}") - - async def async_init(self): - """perform any async init here""" - self._has_pod_metrics = await self.is_pod_metrics_available() - print("Pod Metrics Available:", self._has_pod_metrics) - - async def sync_profile_browsers(self, data: MCSyncData): - """sync profile browsers""" - spec = data.parent.get("spec", {}) - - expire_time = from_k8s_date(spec.get("expireTime")) - browserid = spec.get("id") - - if dt_now() >= expire_time: - self.run_task(self.delete_profile_browser(browserid)) - return {"status": {}, "children": []} + def init_routes(self, app): + """init routes for this operator""" - params = {} - params.update(self.shared_params) - params["id"] = browserid - params["userid"] = spec.get("userid", "") + @app.post("/op/crawls/sync") + async def mc_sync_crawls(data: MCSyncData): + return await self.sync_crawls(data) - oid = spec.get("oid") - storage = StorageRef(spec.get("storageName")) + # reuse sync path, but distinct endpoint for better logging + @app.post("/op/crawls/finalize") + async def mc_sync_finalize(data: MCSyncData): + return await self.sync_crawls(data) - storage_path = storage.get_storage_extra_path(oid) - storage_secret = storage.get_storage_secret_name(oid) + @app.post("/op/crawls/customize") + async def mc_related(data: MCBaseRequest): + return self.get_related(data) - params["storage_path"] = storage_path - params["storage_secret"] = storage_secret - params["profile_filename"] = spec.get("profileFilename", "") - params["crawler_image"] = spec["crawlerImage"] - - params["url"] = spec.get("startUrl", "about:blank") - params["vnc_password"] = spec.get("vncPassword") - - children = self.load_from_yaml("profilebrowser.yaml", params) - - return {"status": {}, "children": children} - - # pylint: disable=too-many-return-statements, invalid-name async def sync_crawls(self, data: MCSyncData): """sync crawls""" @@ -393,10 +103,10 @@ async def sync_crawls(self, data: MCSyncData): cid = spec["cid"] oid = spec["oid"] - redis_url = self.get_redis_url(crawl_id) + redis_url = self.k8s.get_redis_url(crawl_id) params = {} - params.update(self.shared_params) + params.update(self.k8s.shared_params) params["id"] = crawl_id params["cid"] = cid params["userid"] = spec.get("userid", "") @@ -430,7 +140,7 @@ async def sync_crawls(self, data: MCSyncData): print( f"warn crawl {crawl_id} finished but not deleted, post-finish taking too long?" ) - self.run_task(self.delete_crawl_job(crawl_id)) + self.run_task(self.k8s.delete_crawl_job(crawl_id)) return await self.finalize_response( crawl_id, UUID(oid), @@ -466,7 +176,7 @@ async def sync_crawls(self, data: MCSyncData): # shouldn't get here, crawl should already be finalizing when canceled # just in case, handle canceled-but-not-finalizing here if status.state == "canceled": - await self.delete_crawl_job(crawl.id) + await self.k8s.delete_crawl_job(crawl.id) return {"status": status.dict(exclude_none=True), "children": []} # first, check storage quota, and fail immediately if quota reached @@ -722,12 +432,6 @@ async def set_state(self, state, status, crawl_id, allowed_from, **kwargs): ) return False - def load_from_yaml(self, filename, params): - """load and parse k8s template from yaml file""" - return list( - yaml.safe_load_all(self.templates.env.get_template(filename).render(params)) - ) - def get_related(self, data: MCBaseRequest): """return objects related to crawl pods""" spec = data.parent.get("spec", {}) @@ -747,7 +451,7 @@ def get_related(self, data: MCBaseRequest): }, ] - if self._has_pod_metrics: + if self.k8s.has_pod_metrics: related_resources.append( { "apiVersion": METRICS_API, @@ -856,7 +560,7 @@ async def fail_crawl( print(f"============== POD STATUS: {name} ==============") pprint(pods[name]["status"]) - self.run_task(self.print_pod_logs(pod_names, self.log_failed_crawl_lines)) + self.run_task(self.k8s.print_pod_logs(pod_names, self.log_failed_crawl_lines)) return True @@ -916,7 +620,7 @@ async def _get_redis(self, redis_url: str) -> Optional[Redis]: """init redis, ensure connectivity""" redis = None try: - redis = await self.get_redis_client(redis_url) + redis = await self.k8s.get_redis_client(redis_url) # test connection await redis.ping() return redis @@ -1246,7 +950,7 @@ async def add_used_stats(self, crawl_id, pod_status, redis, metrics): pod_info.used.storage = storage # if no pod metrics, get memory estimate from redis itself - if not self._has_pod_metrics: + if not self.k8s.has_pod_metrics: stats = await redis.info("memory") pod_info.used.memory = int(stats.get("used_memory_rss", 0)) @@ -1557,7 +1261,7 @@ async def do_crawl_finished_tasks( ) # finally, delete job - await self.delete_crawl_job(crawl_id) + await self.k8s.delete_crawl_job(crawl_id) async def inc_crawl_complete_stats(self, crawl, finished): """Increment Crawl Stats""" @@ -1573,7 +1277,7 @@ async def inc_crawl_complete_stats(self, crawl, finished): async def mark_for_cancelation(self, crawl_id): """mark crawl as canceled in redis""" try: - redis_url = self.get_redis_url(crawl_id) + redis_url = self.k8s.get_redis_url(crawl_id) redis = await self._get_redis(redis_url) if not redis: return False @@ -1583,192 +1287,3 @@ async def mark_for_cancelation(self, crawl_id): finally: if redis: await redis.close() - - def get_cronjob_crawl_related(self, data: MCBaseRequest): - """return configmap related to crawl""" - labels = data.parent.get("metadata", {}).get("labels", {}) - cid = labels.get("btrix.crawlconfig") - return { - "relatedResources": [ - { - "apiVersion": "v1", - "resource": "configmaps", - "labelSelector": {"matchLabels": {"btrix.crawlconfig": cid}}, - } - ] - } - - async def sync_cronjob_crawl(self, data: MCDecoratorSyncData): - """create crawljobs from a job object spawned by cronjob""" - - metadata = data.object["metadata"] - labels = metadata.get("labels", {}) - cid = labels.get("btrix.crawlconfig") - - name = metadata.get("name") - crawl_id = name - - actual_state, finished = await self.crawl_ops.get_crawl_state(crawl_id) - if finished: - status = None - # mark job as completed - if not data.object["status"].get("succeeded"): - print("Cron Job Complete!", finished) - status = { - "succeeded": 1, - "startTime": metadata.get("creationTimestamp"), - "completionTime": to_k8s_date(finished), - } - - return { - "attachments": [], - "annotations": {"finished": finished}, - "status": status, - } - - configmap = data.related[CMAP][f"crawl-config-{cid}"]["data"] - - oid = configmap.get("ORG_ID") - userid = configmap.get("USER_ID") - - crawljobs = data.attachments[CJS] - - org = await self.org_ops.get_org_by_id(UUID(oid)) - - warc_prefix = None - - if not actual_state: - # cronjob doesn't exist yet - crawlconfig = await self.crawl_config_ops.get_crawl_config( - UUID(cid), UUID(oid) - ) - if not crawlconfig: - print( - f"error: no crawlconfig {cid}. skipping scheduled job. old cronjob left over?" - ) - return {"attachments": []} - - # db create - user = await self.user_ops.get_by_id(UUID(userid)) - if not user: - print(f"error: missing user for id {userid}") - return {"attachments": []} - - warc_prefix = self.crawl_config_ops.get_warc_prefix(org, crawlconfig) - - await self.crawl_config_ops.add_new_crawl( - crawl_id, - crawlconfig, - user, - manual=False, - ) - print("Scheduled Crawl Created: " + crawl_id) - - crawl_id, crawljob = self.new_crawl_job_yaml( - cid, - userid=userid, - oid=oid, - storage=org.storage, - crawler_channel=configmap.get("CRAWLER_CHANNEL", "default"), - scale=int(configmap.get("INITIAL_SCALE", 1)), - crawl_timeout=int(configmap.get("CRAWL_TIMEOUT", 0)), - max_crawl_size=int(configmap.get("MAX_CRAWL_SIZE", "0")), - manual=False, - crawl_id=crawl_id, - warc_prefix=warc_prefix, - ) - - attachments = list(yaml.safe_load_all(crawljob)) - - if crawl_id in crawljobs: - attachments[0]["status"] = crawljobs[CJS][crawl_id]["status"] - - return { - "attachments": attachments, - } - - async def finalize_background_job(self, data: MCDecoratorSyncData) -> dict: - """handle finished background job""" - - metadata = data.object["metadata"] - labels: dict[str, str] = metadata.get("labels", {}) - oid: str = labels.get("btrix.org") or "" - job_type: str = labels.get("job_type") or "" - job_id: str = metadata.get("name") - - status = data.object["status"] - success = status.get("succeeded") == 1 - completion_time = status.get("completionTime") - - finalized = True - - finished = from_k8s_date(completion_time) if completion_time else dt_now() - - try: - await self.background_job_ops.job_finished( - job_id, job_type, UUID(oid), success=success, finished=finished - ) - # print( - # f"{job_type} background job completed: success: {success}, {job_id}", - # flush=True, - # ) - - # pylint: disable=broad-except - except Exception: - print("Update Background Job Error", flush=True) - traceback.print_exc() - - return {"attachments": [], "finalized": finalized} - - def run_task(self, func): - """add bg tasks to set to avoid premature garbage collection""" - task = asyncio.create_task(func) - self.bg_tasks.add(task) - task.add_done_callback(self.bg_tasks.discard) - - -# ============================================================================ -def init_operator_api(app, *args): - """regsiters webhook handlers for metacontroller""" - - oper = BtrixOperator(*args) - - @app.post("/op/crawls/sync") - async def mc_sync_crawls(data: MCSyncData): - return await oper.sync_crawls(data) - - # reuse sync path, but distinct endpoint for better logging - @app.post("/op/crawls/finalize") - async def mc_sync_finalize(data: MCSyncData): - return await oper.sync_crawls(data) - - @app.post("/op/crawls/customize") - async def mc_related(data: MCBaseRequest): - return oper.get_related(data) - - @app.post("/op/profilebrowsers/sync") - async def mc_sync_profile_browsers(data: MCSyncData): - return await oper.sync_profile_browsers(data) - - @app.post("/op/cronjob/sync") - async def mc_sync_cronjob_crawls(data: MCDecoratorSyncData): - return await oper.sync_cronjob_crawl(data) - - @app.post("/op/cronjob/customize") - async def mc_cronjob_related(data: MCBaseRequest): - return oper.get_cronjob_crawl_related(data) - - # nop, but needed for metacontroller - @app.post("/op/backgroundjob/sync") - async def mc_sync_background_jobs(): - return {"attachments": []} - - @app.post("/op/backgroundjob/finalize") - async def mc_finalize_background_jobs(data: MCDecoratorSyncData): - return await oper.finalize_background_job(data) - - @app.get("/healthz", include_in_schema=False) - async def healthz(): - return {} - - return oper diff --git a/backend/btrixcloud/operator/cronjobs.py b/backend/btrixcloud/operator/cronjobs.py new file mode 100644 index 0000000000..720fe9dd71 --- /dev/null +++ b/backend/btrixcloud/operator/cronjobs.py @@ -0,0 +1,128 @@ +""" Operator handler for crawl CronJobs """ + +from uuid import UUID +import yaml + +from btrixcloud.utils import to_k8s_date +from .models import MCBaseRequest, MCDecoratorSyncData, CJS, CMAP +from .baseoperator import BaseOperator + + +# pylint: disable=too-many-locals +# ============================================================================ +class CronJobOperator(BaseOperator): + """CronJob Operator""" + + def init_routes(self, app): + """init routes for crawl CronJob decorator""" + + @app.post("/op/cronjob/sync") + async def mc_sync_cronjob_crawls(data: MCDecoratorSyncData): + return await self.sync_cronjob_crawl(data) + + @app.post("/op/cronjob/customize") + async def mc_cronjob_related(data: MCBaseRequest): + return self.get_cronjob_crawl_related(data) + + def get_cronjob_crawl_related(self, data: MCBaseRequest): + """return configmap related to crawl""" + labels = data.parent.get("metadata", {}).get("labels", {}) + cid = labels.get("btrix.crawlconfig") + return { + "relatedResources": [ + { + "apiVersion": "v1", + "resource": "configmaps", + "labelSelector": {"matchLabels": {"btrix.crawlconfig": cid}}, + } + ] + } + + async def sync_cronjob_crawl(self, data: MCDecoratorSyncData): + """create crawljobs from a job object spawned by cronjob""" + + metadata = data.object["metadata"] + labels = metadata.get("labels", {}) + cid = labels.get("btrix.crawlconfig") + + name = metadata.get("name") + crawl_id = name + + actual_state, finished = await self.crawl_ops.get_crawl_state(crawl_id) + if finished: + status = None + # mark job as completed + if not data.object["status"].get("succeeded"): + print("Cron Job Complete!", finished) + status = { + "succeeded": 1, + "startTime": metadata.get("creationTimestamp"), + "completionTime": to_k8s_date(finished), + } + + return { + "attachments": [], + "annotations": {"finished": finished}, + "status": status, + } + + configmap = data.related[CMAP][f"crawl-config-{cid}"]["data"] + + oid = configmap.get("ORG_ID") + userid = configmap.get("USER_ID") + + crawljobs = data.attachments[CJS] + + org = await self.org_ops.get_org_by_id(UUID(oid)) + + warc_prefix = None + + if not actual_state: + # cronjob doesn't exist yet + crawlconfig = await self.crawl_config_ops.get_crawl_config( + UUID(cid), UUID(oid) + ) + if not crawlconfig: + print( + f"error: no crawlconfig {cid}. skipping scheduled job. old cronjob left over?" + ) + return {"attachments": []} + + # db create + user = await self.user_ops.get_by_id(UUID(userid)) + if not user: + print(f"error: missing user for id {userid}") + return {"attachments": []} + + warc_prefix = self.crawl_config_ops.get_warc_prefix(org, crawlconfig) + + await self.crawl_config_ops.add_new_crawl( + crawl_id, + crawlconfig, + user, + manual=False, + ) + print("Scheduled Crawl Created: " + crawl_id) + + crawl_id, crawljob = self.k8s.new_crawl_job_yaml( + cid, + userid=userid, + oid=oid, + storage=org.storage, + crawler_channel=configmap.get("CRAWLER_CHANNEL", "default"), + scale=int(configmap.get("INITIAL_SCALE", 1)), + crawl_timeout=int(configmap.get("CRAWL_TIMEOUT", 0)), + max_crawl_size=int(configmap.get("MAX_CRAWL_SIZE", "0")), + manual=False, + crawl_id=crawl_id, + warc_prefix=warc_prefix, + ) + + attachments = list(yaml.safe_load_all(crawljob)) + + if crawl_id in crawljobs: + attachments[0]["status"] = crawljobs[CJS][crawl_id]["status"] + + return { + "attachments": attachments, + } diff --git a/backend/btrixcloud/operator/models.py b/backend/btrixcloud/operator/models.py new file mode 100644 index 0000000000..2102a11779 --- /dev/null +++ b/backend/btrixcloud/operator/models.py @@ -0,0 +1,198 @@ +""" Operator Models """ + +from collections import defaultdict +from uuid import UUID +from typing import Optional, DefaultDict +from pydantic import BaseModel, Field +from kubernetes.utils import parse_quantity +from btrixcloud.models import StorageRef + + +BTRIX_API = "btrix.cloud/v1" + +CMAP = "ConfigMap.v1" +PVC = "PersistentVolumeClaim.v1" +POD = "Pod.v1" +CJS = f"CrawlJob.{BTRIX_API}" + + +# ============================================================================ +class MCBaseRequest(BaseModel): + """base metacontroller model, used for customize hook""" + + parent: dict + controller: dict + + +# ============================================================================ +class MCSyncData(MCBaseRequest): + """sync / finalize metacontroller model""" + + children: dict + related: dict + finalizing: bool = False + + +# ============================================================================ +class MCDecoratorSyncData(BaseModel): + """sync for decoratorcontroller model""" + + object: dict + controller: dict + + attachments: dict + related: dict + finalizing: bool = False + + +# ============================================================================ +class CrawlSpec(BaseModel): + """spec from k8s CrawlJob object""" + + id: str + cid: UUID + oid: UUID + scale: int = 1 + storage: StorageRef + started: str + crawler_channel: str + stopping: bool = False + scheduled: bool = False + timeout: int = 0 + max_crawl_size: int = 0 + + +# ============================================================================ +class PodResourcePercentage(BaseModel): + """Resource usage percentage ratios""" + + memory: float = 0 + cpu: float = 0 + storage: float = 0 + + +# ============================================================================ +class PodResources(BaseModel): + """Pod Resources""" + + memory: int = 0 + cpu: float = 0 + storage: int = 0 + + def __init__(self, *a, **kw): + if "memory" in kw: + kw["memory"] = int(parse_quantity(kw["memory"])) + if "cpu" in kw: + kw["cpu"] = float(parse_quantity(kw["cpu"])) + if "storage" in kw: + kw["storage"] = int(parse_quantity(kw["storage"])) + super().__init__(*a, **kw) + + +# ============================================================================ +class PodInfo(BaseModel): + """Aggregate pod status info held in CrawlJob""" + + exitTime: Optional[str] = None + exitCode: Optional[int] = None + isNewExit: Optional[bool] = Field(default=None, exclude=True) + reason: Optional[str] = None + + allocated: PodResources = PodResources() + used: PodResources = PodResources() + + newCpu: Optional[int] = None + newMemory: Optional[int] = None + + def dict(self, *a, **kw): + res = super().dict(*a, **kw) + percent = { + "memory": self.get_percent_memory(), + "cpu": self.get_percent_cpu(), + "storage": self.get_percent_storage(), + } + res["percent"] = percent + return res + + def get_percent_memory(self) -> float: + """compute percent memory used""" + return ( + float(self.used.memory) / float(self.allocated.memory) + if self.allocated.memory + else 0 + ) + + def get_percent_cpu(self) -> float: + """compute percent cpu used""" + return ( + float(self.used.cpu) / float(self.allocated.cpu) + if self.allocated.cpu + else 0 + ) + + def get_percent_storage(self) -> float: + """compute percent storage used""" + return ( + float(self.used.storage) / float(self.allocated.storage) + if self.allocated.storage + else 0 + ) + + def should_restart_pod(self): + """return true if pod should be restarted""" + if self.newMemory and self.newMemory != self.allocated.memory: + return True + + if self.newCpu and self.newCpu != self.allocated.cpu: + return True + + return False + + +# ============================================================================ +# pylint: disable=invalid-name +class CrawlStatus(BaseModel): + """status from k8s CrawlJob object""" + + state: str = "starting" + pagesFound: int = 0 + pagesDone: int = 0 + size: int = 0 + # human readable size string + sizeHuman: str = "" + scale: int = 1 + filesAdded: int = 0 + filesAddedSize: int = 0 + finished: Optional[str] = None + stopping: bool = False + stopReason: Optional[str] = None + initRedis: bool = False + crawlerImage: Optional[str] = None + lastActiveTime: str = "" + podStatus: Optional[DefaultDict[str, PodInfo]] = defaultdict( + lambda: PodInfo() # pylint: disable=unnecessary-lambda + ) + # placeholder for pydantic 2.0 -- will require this version + # podStatus: Optional[ + # DefaultDict[str, Annotated[PodInfo, Field(default_factory=PodInfo)]] + # ] + restartTime: Optional[str] + canceled: bool = False + + # updated on pod exits and at regular interval + # Crawl Execution Time -- time all crawler pods have been running + # used to track resource usage and enforce execution minutes limit + crawlExecTime: int = 0 + + # Elapsed Exec Time -- time crawl has been running in at least one pod + # used for crawl timeouts + elapsedCrawlTime: int = 0 + + # last exec time update + lastUpdatedTime: str = "" + + # any pods exited + anyCrawlPodNewExit: Optional[bool] = Field(default=False, exclude=True) + + # don't include in status, use by metacontroller + resync_after: Optional[int] = Field(default=None, exclude=True) diff --git a/backend/btrixcloud/operator/profiles.py b/backend/btrixcloud/operator/profiles.py new file mode 100644 index 0000000000..713252d7c5 --- /dev/null +++ b/backend/btrixcloud/operator/profiles.py @@ -0,0 +1,57 @@ +""" Operator handler for ProfileJobs """ + +from btrixcloud.utils import ( + from_k8s_date, + dt_now, +) + +from btrixcloud.models import StorageRef + +from .models import MCSyncData +from .baseoperator import BaseOperator + + +# ============================================================================ +class ProfileOperator(BaseOperator): + """ProfileOperator""" + + def init_routes(self, app): + """init routes for this operator""" + + @app.post("/op/profilebrowsers/sync") + async def mc_sync_profile_browsers(data: MCSyncData): + return await self.sync_profile_browsers(data) + + async def sync_profile_browsers(self, data: MCSyncData): + """sync profile browsers""" + spec = data.parent.get("spec", {}) + + expire_time = from_k8s_date(spec.get("expireTime")) + browserid = spec.get("id") + + if dt_now() >= expire_time: + self.run_task(self.k8s.delete_profile_browser(browserid)) + return {"status": {}, "children": []} + + params = {} + params.update(self.k8s.shared_params) + params["id"] = browserid + params["userid"] = spec.get("userid", "") + + oid = spec.get("oid") + storage = StorageRef(spec.get("storageName")) + + storage_path = storage.get_storage_extra_path(oid) + storage_secret = storage.get_storage_secret_name(oid) + + params["storage_path"] = storage_path + params["storage_secret"] = storage_secret + params["profile_filename"] = spec.get("profileFilename", "") + params["crawler_image"] = spec["crawlerImage"] + + params["url"] = spec.get("startUrl", "about:blank") + params["vnc_password"] = spec.get("vncPassword") + + children = self.load_from_yaml("profilebrowser.yaml", params) + + return {"status": {}, "children": children}