Skip to content

Commit

Permalink
Refactor operator class into module (#1564)
Browse files Browse the repository at this point in the history
The operator class has gotten fairly large, this is a first pass in
refactoring operator.py into a submodule instead, with multiple operator
instances which handle different types of objects.

- The main k8s interface has been split into K8sOpApi which extends K8sApi
and is shared across all operators.
- Each operator extends BaseOperator which also has an instance of K8sOpApi
- The CrawlOperator is still the bulk of the functionality, but will likely be further refactored
to support QA jobs

---------
Co-authored-by: Tessa Walsh <[email protected]>
  • Loading branch information
ikreymer authored Feb 29, 2024
1 parent da19691 commit 2ac6584
Show file tree
Hide file tree
Showing 8 changed files with 656 additions and 537 deletions.
4 changes: 2 additions & 2 deletions backend/btrixcloud/main_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,5 @@ def main():
async def startup():
"""init on startup"""
register_exit_handler()
oper = main()
await oper.async_init()
settings = main()
await settings.async_init()
28 changes: 28 additions & 0 deletions backend/btrixcloud/operator/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
""" operators module """

from .profiles import ProfileOperator
from .bgjobs import BgJobOperator
from .cronjobs import CronJobOperator
from .crawls import CrawlOperator
from .baseoperator import K8sOpAPI

operator_classes = [ProfileOperator, BgJobOperator, CronJobOperator, CrawlOperator]


# ============================================================================
def init_operator_api(app, *args):
"""registers webhook handlers for metacontroller"""

k8s = K8sOpAPI()

operators = []
for cls in operator_classes:
oper = cls(k8s, *args)
oper.init_routes(app)
operators.append(oper)

@app.get("/healthz", include_in_schema=False)
async def healthz():
return {}

return k8s
131 changes: 131 additions & 0 deletions backend/btrixcloud/operator/baseoperator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
""" Base Operator class for all operators """

import asyncio
from typing import TYPE_CHECKING
from kubernetes.utils import parse_quantity

import yaml
from btrixcloud.k8sapi import K8sAPI


if TYPE_CHECKING:
from btrixcloud.crawlconfigs import CrawlConfigOps
from btrixcloud.crawls import CrawlOps
from btrixcloud.orgs import OrgOps
from btrixcloud.colls import CollectionOps
from btrixcloud.storages import StorageOps
from btrixcloud.webhooks import EventWebhookOps
from btrixcloud.users import UserManager
from btrixcloud.background_jobs import BackgroundJobOps
from btrixcloud.pages import PageOps
from redis.asyncio.client import Redis
else:
CrawlConfigOps = CrawlOps = OrgOps = CollectionOps = Redis = object
StorageOps = EventWebhookOps = UserManager = BackgroundJobOps = PageOps = object


# ============================================================================
class K8sOpAPI(K8sAPI):
"""Additional k8s api for operators"""

def __init__(self):
super().__init__()
self.config_file = "/config/config.yaml"
with open(self.config_file, encoding="utf-8") as fh_config:
self.shared_params = yaml.safe_load(fh_config)

self.has_pod_metrics = False
self.compute_crawler_resources()

def compute_crawler_resources(self):
"""compute memory / cpu resources for crawlers"""
p = self.shared_params
num = max(int(p["crawler_browser_instances"]) - 1, 0)
if not p.get("crawler_cpu"):
base = parse_quantity(p["crawler_cpu_base"])
extra = parse_quantity(p["crawler_extra_cpu_per_browser"])

# cpu is a floating value of cpu cores
p["crawler_cpu"] = float(base + num * extra)

print(f"cpu = {base} + {num} * {extra} = {p['crawler_cpu']}")
else:
print(f"cpu = {p['crawler_cpu']}")

if not p.get("crawler_memory"):
base = parse_quantity(p["crawler_memory_base"])
extra = parse_quantity(p["crawler_extra_memory_per_browser"])

# memory is always an int
p["crawler_memory"] = int(base + num * extra)

print(f"memory = {base} + {num} * {extra} = {p['crawler_memory']}")
else:
print(f"memory = {p['crawler_memory']}")

async def async_init(self):
"""perform any async init here"""
self.has_pod_metrics = await self.is_pod_metrics_available()
print("Pod Metrics Available:", self.has_pod_metrics)


# pylint: disable=too-many-instance-attributes, too-many-arguments
# ============================================================================
class BaseOperator:
"""BaseOperator"""

k8s: K8sOpAPI
crawl_config_ops: CrawlConfigOps
crawl_ops: CrawlOps
orgs_ops: OrgOps
coll_ops: CollectionOps
storage_ops: StorageOps
event_webhook_ops: EventWebhookOps
background_job_ops: BackgroundJobOps
user_ops: UserManager
page_ops: PageOps

def __init__(
self,
k8s,
crawl_config_ops,
crawl_ops,
org_ops,
coll_ops,
storage_ops,
event_webhook_ops,
background_job_ops,
page_ops,
):
self.k8s = k8s
self.crawl_config_ops = crawl_config_ops
self.crawl_ops = crawl_ops
self.org_ops = org_ops
self.coll_ops = coll_ops
self.storage_ops = storage_ops
self.background_job_ops = background_job_ops
self.event_webhook_ops = event_webhook_ops
self.page_ops = page_ops

self.user_ops = crawl_config_ops.user_manager

# to avoid background tasks being garbage collected
# see: https://stackoverflow.com/a/74059981
self.bg_tasks = set()

def init_routes(self, app):
"""init routes for this operator"""

def run_task(self, func):
"""add bg tasks to set to avoid premature garbage collection"""
task = asyncio.create_task(func)
self.bg_tasks.add(task)
task.add_done_callback(self.bg_tasks.discard)

def load_from_yaml(self, filename, params):
"""load and parse k8s template from yaml file"""
return list(
yaml.safe_load_all(
self.k8s.templates.env.get_template(filename).render(params)
)
)
62 changes: 62 additions & 0 deletions backend/btrixcloud/operator/bgjobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
""" Operator handler for BackgroundJobs """

from uuid import UUID
import traceback

from btrixcloud.utils import (
from_k8s_date,
dt_now,
)

from .models import MCDecoratorSyncData
from .baseoperator import BaseOperator


# ============================================================================
class BgJobOperator(BaseOperator):
"""BgJobOperator"""

def init_routes(self, app):
"""init routes for this operator"""

# nop, but needed for metacontroller
@app.post("/op/backgroundjob/sync")
async def mc_sync_background_jobs():
return {"attachments": []}

@app.post("/op/backgroundjob/finalize")
async def mc_finalize_background_jobs(data: MCDecoratorSyncData):
return await self.finalize_background_job(data)

async def finalize_background_job(self, data: MCDecoratorSyncData) -> dict:
"""handle finished background job"""

metadata = data.object["metadata"]
labels: dict[str, str] = metadata.get("labels", {})
oid: str = labels.get("btrix.org") or ""
job_type: str = labels.get("job_type") or ""
job_id: str = metadata.get("name")

status = data.object["status"]
success = status.get("succeeded") == 1
completion_time = status.get("completionTime")

finalized = True

finished = from_k8s_date(completion_time) if completion_time else dt_now()

try:
await self.background_job_ops.job_finished(
job_id, job_type, UUID(oid), success=success, finished=finished
)
# print(
# f"{job_type} background job completed: success: {success}, {job_id}",
# flush=True,
# )

# pylint: disable=broad-except
except Exception:
print("Update Background Job Error", flush=True)
traceback.print_exc()

return {"attachments": [], "finalized": finalized}
Loading

0 comments on commit 2ac6584

Please sign in to comment.