diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index 9d832c6866..405bf3b5a9 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -136,6 +136,9 @@ async def get_crawl( crawl.config.seeds = None crawl.storageQuotaReached = await self.orgs.storage_quota_reached(crawl.oid) + crawl.execMinutesQuotaReached = await self.orgs.exec_mins_quota_reached( + crawl.oid + ) return crawl diff --git a/backend/btrixcloud/crawlconfigs.py b/backend/btrixcloud/crawlconfigs.py index b4d044dd02..f6a5970355 100644 --- a/backend/btrixcloud/crawlconfigs.py +++ b/backend/btrixcloud/crawlconfigs.py @@ -3,7 +3,7 @@ """ # pylint: disable=too-many-lines -from typing import List, Union, Optional +from typing import List, Union, Optional, Tuple import uuid import asyncio @@ -137,7 +137,7 @@ async def add_crawl_config( config: CrawlConfigIn, org: Organization, user: User, - ) -> tuple[str, str, bool]: + ) -> Tuple[str, Optional[str], bool, bool]: """Add new crawl config""" data = config.dict() data["oid"] = org.id @@ -174,12 +174,17 @@ async def add_crawl_config( ) run_now = config.runNow - quota_reached = await self.org_ops.storage_quota_reached(org.id) + storage_quota_reached = await self.org_ops.storage_quota_reached(org.id) + exec_mins_quota_reached = await self.org_ops.exec_mins_quota_reached(org.id) - if quota_reached: + if storage_quota_reached: run_now = False print(f"Storage quota exceeded for org {org.id}", flush=True) + if exec_mins_quota_reached: + run_now = False + print(f"Execution minutes quota exceeded for org {org.id}", flush=True) + crawl_id = await self.crawl_manager.add_crawl_config( crawlconfig=crawlconfig, storage=org.storage, @@ -191,7 +196,12 @@ async def add_crawl_config( if crawl_id and run_now: await self.add_new_crawl(crawl_id, crawlconfig, user, manual=True) - return result.inserted_id, crawl_id, quota_reached + return ( + result.inserted_id, + crawl_id or None, + storage_quota_reached, + exec_mins_quota_reached, + ) async def add_new_crawl( self, crawl_id: str, crawlconfig: CrawlConfig, user: User, manual: bool @@ -336,6 +346,10 @@ async def update_crawl_config( "updated": True, "settings_changed": changed, "metadata_changed": metadata_changed, + "storageQuotaReached": await self.org_ops.storage_quota_reached(org.id), + "execMinutesQuotaReached": await self.org_ops.exec_mins_quota_reached( + org.id + ), } if run_now: crawl_id = await self.run_now(cid, org, user) @@ -757,6 +771,9 @@ async def run_now(self, cid: uuid.UUID, org: Organization, user: User): if await self.org_ops.storage_quota_reached(org.id): raise HTTPException(status_code=403, detail="storage_quota_reached") + if await self.org_ops.exec_mins_quota_reached(org.id): + raise HTTPException(status_code=403, detail="exec_minutes_quota_reached") + try: crawl_id = await self.crawl_manager.create_crawl_job( crawlconfig, userid=str(user.id) @@ -991,12 +1008,18 @@ async def add_crawl_config( org: Organization = Depends(org_crawl_dep), user: User = Depends(user_dep), ): - cid, new_job_name, quota_reached = await ops.add_crawl_config(config, org, user) + ( + cid, + new_job_name, + storage_quota_reached, + exec_mins_quota_reached, + ) = await ops.add_crawl_config(config, org, user) return { "added": True, "id": str(cid), "run_now_job": new_job_name, - "storageQuotaReached": quota_reached, + "storageQuotaReached": storage_quota_reached, + "execMinutesQuotaReached": exec_mins_quota_reached, } @router.patch("/{cid}", dependencies=[Depends(org_crawl_dep)]) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 1f6a5e3adc..20a7c62f0e 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -528,11 +528,11 @@ async def update_running_crawl_stats(self, crawl_id, stats): query = {"_id": crawl_id, "type": "crawl", "state": "running"} return await self.crawls.find_one_and_update(query, {"$set": {"stats": stats}}) - async def store_exec_time(self, crawl_id, exec_time): - """set exec time, only if not already set""" - query = {"_id": crawl_id, "type": "crawl", "execTime": {"$in": [0, None]}} + async def inc_crawl_exec_time(self, crawl_id, exec_time): + """increment exec time""" return await self.crawls.find_one_and_update( - query, {"$set": {"execTime": exec_time}} + {"_id": crawl_id, "type": "crawl"}, + {"$inc": {"crawlExecSeconds": exec_time}}, ) async def get_crawl_state(self, crawl_id): diff --git a/backend/btrixcloud/models.py b/backend/btrixcloud/models.py index 10ca719ea0..1830849922 100644 --- a/backend/btrixcloud/models.py +++ b/backend/btrixcloud/models.py @@ -513,6 +513,7 @@ class CrawlOut(BaseMongoModel): cid_rev: Optional[int] storageQuotaReached: Optional[bool] + execMinutesQuotaReached: Optional[bool] # ============================================================================ @@ -722,6 +723,7 @@ class OrgQuotas(BaseModel): maxConcurrentCrawls: Optional[int] = 0 maxPagesPerCrawl: Optional[int] = 0 storageQuota: Optional[int] = 0 + maxExecMinutesPerMonth: Optional[int] = 0 # ============================================================================ @@ -755,6 +757,9 @@ class OrgOut(BaseMongoModel): webhookUrls: Optional[OrgWebhookUrls] = OrgWebhookUrls() quotas: Optional[OrgQuotas] = OrgQuotas() + storageQuotaReached: Optional[bool] + execMinutesQuotaReached: Optional[bool] + # ============================================================================ class Organization(BaseMongoModel): diff --git a/backend/btrixcloud/operator.py b/backend/btrixcloud/operator.py index 813d6f196e..7848798864 100644 --- a/backend/btrixcloud/operator.py +++ b/backend/btrixcloud/operator.py @@ -54,6 +54,9 @@ # time in seconds before a crawl is deemed 'waiting' instead of 'starting' STARTING_TIME_SECS = 60 +# how often to update execution time seconds +EXEC_TIME_UPDATE_SECS = 60 + # ============================================================================ class MCBaseRequest(BaseModel): @@ -216,11 +219,19 @@ class CrawlStatus(BaseModel): # DefaultDict[str, Annotated[PodInfo, Field(default_factory=PodInfo)]] # ] restartTime: Optional[str] - execTime: int = 0 canceled: bool = False + # Execution Time -- updated on pod exits and at regular interval + crawlExecTime: int = 0 + + # last exec time update + lastUpdatedTime: str = "" + + # any pods exited + anyCrawlPodNewExit: Optional[bool] = Field(default=False, exclude=True) + # don't include in status, use by metacontroller - resync_after: Optional[int] = None + resync_after: Optional[int] = Field(default=None, exclude=True) # ============================================================================ @@ -255,6 +266,10 @@ def __init__( self._has_pod_metrics = False self.compute_crawler_resources() + # to avoid background tasks being garbage collected + # see: https://stackoverflow.com/a/74059981 + self.bg_tasks = set() + def compute_crawler_resources(self): """compute memory / cpu resources for crawlers""" # pylint: disable=invalid-name @@ -295,7 +310,7 @@ async def sync_profile_browsers(self, data: MCSyncData): browserid = spec.get("id") if dt_now() >= expire_time: - asyncio.create_task(self.delete_profile_browser(browserid)) + self.run_task(self.delete_profile_browser(browserid)) return {"status": {}, "children": []} params = {} @@ -313,7 +328,7 @@ async def sync_profile_browsers(self, data: MCSyncData): return {"status": {}, "children": children} - # pylint: disable=too-many-return-statements + # pylint: disable=too-many-return-statements, invalid-name async def sync_crawls(self, data: MCSyncData): """sync crawls""" @@ -347,7 +362,12 @@ async def sync_crawls(self, data: MCSyncData): raise HTTPException(status_code=400, detail="out_of_sync_status") return await self.finalize_response( - crawl_id, uuid.UUID(oid), status, spec, data.children, params + crawl_id, + uuid.UUID(oid), + status, + spec, + data.children, + params, ) # just in case, finished but not deleted, can only get here if @@ -356,9 +376,14 @@ async def sync_crawls(self, data: MCSyncData): print( f"warn crawl {crawl_id} finished but not deleted, post-finish taking too long?" ) - asyncio.create_task(self.delete_crawl_job(crawl_id)) + self.run_task(self.delete_crawl_job(crawl_id)) return await self.finalize_response( - crawl_id, uuid.UUID(oid), status, spec, data.children, params + crawl_id, + uuid.UUID(oid), + status, + spec, + data.children, + params, ) try: @@ -413,7 +438,11 @@ async def sync_crawls(self, data: MCSyncData): self.sync_resources(status, pod_name, pod, data.children) status = await self.sync_crawl_state( - redis_url, crawl, status, pods, data.related.get(METRICS, {}) + redis_url, + crawl, + status, + pods, + data.related.get(METRICS, {}), ) # auto sizing handled here @@ -421,10 +450,21 @@ async def sync_crawls(self, data: MCSyncData): if status.finished: return await self.finalize_response( - crawl_id, uuid.UUID(oid), status, spec, data.children, params + crawl_id, + uuid.UUID(oid), + status, + spec, + data.children, + params, ) + + await self.increment_pod_exec_time( + pods, status, crawl.id, crawl.oid, EXEC_TIME_UPDATE_SECS + ) + else: status.scale = crawl.scale + status.lastUpdatedTime = to_k8s_date(dt_now()) children = self._load_redis(params, status, data.children) @@ -447,7 +487,7 @@ async def sync_crawls(self, data: MCSyncData): children.extend(self._load_crawler(params, i, status, data.children)) return { - "status": status.dict(exclude_none=True, exclude={"resync_after": True}), + "status": status.dict(exclude_none=True), "children": children, "resyncAfterSeconds": status.resync_after, } @@ -682,8 +722,6 @@ async def cancel_crawl( await self.mark_for_cancelation(crawl_id) if not status.canceled: - cancel_time = datetime.utcnow() - for name, pod in pods.items(): pstatus = pod["status"] role = pod["metadata"]["labels"]["role"] @@ -696,13 +734,6 @@ async def cancel_crawl( cstatus = pstatus["containerStatuses"][0] - running = cstatus["state"].get("running") - - if running: - self.inc_exec_time( - name, status, cancel_time, running.get("startedAt") - ) - self.handle_terminated_pod( name, role, status, cstatus["state"].get("terminated") ) @@ -737,14 +768,14 @@ async def fail_crawl( print(f"============== POD STATUS: {name} ==============") pprint(pods[name]["status"]) - asyncio.create_task(self.print_pod_logs(pod_names, self.log_failed_crawl_lines)) + self.run_task(self.print_pod_logs(pod_names, self.log_failed_crawl_lines)) return True def _empty_response(self, status): """done response for removing crawl""" return { - "status": status.dict(exclude_none=True, exclude={"resync_after": True}), + "status": status.dict(exclude_none=True), "children": [], } @@ -764,23 +795,19 @@ async def finalize_response( finalized = False - exec_updated = False - pods = children[POD] if redis_pod in pods: # if has other pods, keep redis pod until they are removed if len(pods) > 1: new_children = self._load_redis(params, status, children) + await self.increment_pod_exec_time(pods, status, crawl_id, oid) # keep pvs until pods are removed if new_children: new_children.extend(list(children[PVC].values())) if not children[POD] and not children[PVC]: - # ensure exec time was successfully updated - exec_updated = await self.store_exec_time(crawl_id, oid, status.execTime) - # keep parent until ttl expired, if any if status.finished: ttl = spec.get("ttlSecondsAfterFinished", DEFAULT_TTL) @@ -792,9 +819,9 @@ async def finalize_response( finalized = True return { - "status": status.dict(exclude_none=True, exclude={"resync_after": True}), + "status": status.dict(exclude_none=True), "children": new_children, - "finalized": finalized and exec_updated, + "finalized": finalized, } async def _get_redis(self, redis_url): @@ -825,7 +852,9 @@ async def sync_crawl_state(self, redis_url, crawl, status, pods, metrics): await self.add_used_stats(crawl.id, status.podStatus, redis, metrics) - await self.log_crashes(crawl.id, status.podStatus, redis) + # skip if no newly exited pods + if status.anyCrawlPodNewExit: + await self.log_crashes(crawl.id, status.podStatus, redis) if not crawler_running: if self.should_mark_waiting(status.state, crawl.started): @@ -867,7 +896,7 @@ async def sync_crawl_state(self, redis_url, crawl, status, pods, metrics): crawl.id, allowed_from=["starting", "waiting_capacity"], ): - asyncio.create_task( + self.run_task( self.event_webhook_ops.create_crawl_started_notification( crawl.id, crawl.oid, scheduled=crawl.scheduled ) @@ -907,6 +936,7 @@ def sync_pod_status(self, pods, status): crawler_running = False redis_running = False done = True + try: for name, pod in pods.items(): running = False @@ -961,8 +991,8 @@ def handle_terminated_pod(self, name, role, status, terminated): pod_status.isNewExit = pod_status.exitTime != exit_time if pod_status.isNewExit and role == "crawler": - self.inc_exec_time(name, status, exit_time, terminated.get("startedAt")) pod_status.exitTime = exit_time + status.anyCrawlPodNewExit = True # detect reason exit_code = terminated.get("exitCode") @@ -976,6 +1006,105 @@ def handle_terminated_pod(self, name, role, status, terminated): pod_status.exitCode = exit_code + async def increment_pod_exec_time( + self, + pods: dict[str, dict], + status: CrawlStatus, + crawl_id: str, + oid: uuid.UUID, + min_duration=0, + ) -> None: + """inc exec time tracking""" + now = dt_now() + + if not status.lastUpdatedTime: + status.lastUpdatedTime = to_k8s_date(now) + return + + update_start_time = from_k8s_date(status.lastUpdatedTime) + + reason = None + update_duration = (now - update_start_time).total_seconds() + + if status.anyCrawlPodNewExit: + reason = "new pod exit" + + elif status.canceled: + reason = "crawl canceled" + + elif now.month != update_start_time.month: + reason = "month change" + + elif update_duration >= min_duration: + reason = "duration reached" if min_duration else "finalizing" + + if not reason: + return + + exec_time = 0 + print( + f"Exec Time Update: {reason}: {now} - {update_start_time} = {update_duration}" + ) + + for name, pod in pods.items(): + pstatus = pod["status"] + role = pod["metadata"]["labels"]["role"] + + if role != "crawler": + continue + + if "containerStatuses" not in pstatus: + continue + + cstate = pstatus["containerStatuses"][0]["state"] + + end_time = None + start_time = None + pod_state = "" + + if "running" in cstate: + pod_state = "running" + state = cstate["running"] + start_time = from_k8s_date(state.get("startedAt")) + if update_start_time and update_start_time > start_time: + start_time = update_start_time + + end_time = now + elif "terminated" in cstate: + pod_state = "terminated" + state = cstate["terminated"] + start_time = from_k8s_date(state.get("startedAt")) + end_time = from_k8s_date(state.get("finishedAt")) + if update_start_time and update_start_time > start_time: + start_time = update_start_time + + # already counted + if update_start_time and end_time < update_start_time: + print( + f" - {name}: {pod_state}: skipping already counted, " + + f"{end_time} < {start_time}" + ) + continue + + if end_time and start_time: + duration = int((end_time - start_time).total_seconds()) + print( + f" - {name}: {pod_state}: {end_time} - {start_time} = {duration}" + ) + exec_time += duration + + if exec_time: + await self.crawl_ops.inc_crawl_exec_time(crawl_id, exec_time) + await self.org_ops.inc_org_time_stats(oid, exec_time, True) + status.crawlExecTime += exec_time + + print( + f" Exec Time Total: {status.crawlExecTime}, Incremented By: {exec_time}", + flush=True, + ) + + status.lastUpdatedTime = to_k8s_date(now) + def should_mark_waiting(self, state, started): """Should the crawl be marked as waiting for capacity?""" if state in RUNNING_STATES: @@ -983,7 +1112,7 @@ def should_mark_waiting(self, state, started): if state == "starting": started = from_k8s_date(started) - return (datetime.utcnow() - started).total_seconds() > STARTING_TIME_SECS + return (dt_now() - started).total_seconds() > STARTING_TIME_SECS return False @@ -1045,7 +1174,7 @@ async def log_crashes(self, crawl_id, pod_status, redis): def get_log_line(self, message, details): """get crawler error line for logging""" err = { - "timestamp": datetime.utcnow().isoformat(), + "timestamp": dt_now().isoformat(), "logLevel": "error", "context": "k8s", "message": message, @@ -1088,7 +1217,7 @@ def is_crawl_stopping(self, crawl, size): return True # check crawl expiry - if crawl.expire_time and datetime.utcnow() > crawl.expire_time: + if crawl.expire_time and dt_now() > crawl.expire_time: print(f"Graceful Stop: Job duration expired at {crawl.expire_time}") return True @@ -1138,6 +1267,12 @@ async def update_crawl_state(self, redis, crawl, status, pods, done): status.stopping = self.is_crawl_stopping(crawl, status.size) + # check exec time quotas and stop if reached limit + if not status.stopping: + if await self.org_ops.exec_mins_quota_reached(crawl.oid): + status.stopping = True + + # mark crawl as stopping if status.stopping: await redis.set(f"{crawl.id}:stopping", "1") # backwards compatibility with older crawler @@ -1245,7 +1380,7 @@ async def mark_finished( if crawl and state in SUCCESSFUL_STATES: await self.inc_crawl_complete_stats(crawl, finished) - asyncio.create_task( + self.run_task( self.do_crawl_finished_tasks( crawl_id, cid, oid, status.filesAddedSize, state ) @@ -1279,32 +1414,6 @@ async def do_crawl_finished_tasks( # finally, delete job await self.delete_crawl_job(crawl_id) - def inc_exec_time(self, name, status, finished_at, started_at): - """increment execTime on pod status""" - end_time = ( - from_k8s_date(finished_at) - if not isinstance(finished_at, datetime) - else finished_at - ) - start_time = from_k8s_date(started_at) - exec_time = int((end_time - start_time).total_seconds()) - status.execTime += exec_time - print(f"{name} exec time: {exec_time}") - return exec_time - - async def store_exec_time(self, crawl_id, oid, exec_time): - """store execTime in crawl (if not already set), and increment org counter""" - try: - if await self.crawl_ops.store_exec_time(crawl_id, exec_time): - print(f"Exec Time: {exec_time}", flush=True) - await self.org_ops.inc_org_time_stats(oid, exec_time, True) - - return True - # pylint: disable=broad-except - except Exception as exc: - print(exc, flush=True) - return False - async def inc_crawl_complete_stats(self, crawl, finished): """Increment Crawl Stats""" @@ -1453,6 +1562,12 @@ async def sync_cronjob_crawl(self, data: MCDecoratorSyncData): "attachments": attachments, } + def run_task(self, func): + """add bg tasks to set to avoid premature garbage collection""" + task = asyncio.create_task(func) + self.bg_tasks.add(task) + task.add_done_callback(self.bg_tasks.discard) + # ============================================================================ def init_operator_api( diff --git a/backend/btrixcloud/orgs.py b/backend/btrixcloud/orgs.py index 6358a77b33..048ef9237a 100644 --- a/backend/btrixcloud/orgs.py +++ b/backend/btrixcloud/orgs.py @@ -1,6 +1,7 @@ """ Organization API handling """ +import math import os import time import urllib.parse @@ -301,7 +302,7 @@ async def inc_org_bytes_stored(self, oid: uuid.UUID, size: int, type_="crawl"): return await self.storage_quota_reached(oid) # pylint: disable=invalid-name - async def storage_quota_reached(self, oid: uuid.UUID): + async def storage_quota_reached(self, oid: uuid.UUID) -> bool: """Return boolean indicating if storage quota is met or exceeded.""" quota = await self.get_org_storage_quota(oid) if not quota: @@ -315,7 +316,34 @@ async def storage_quota_reached(self, oid: uuid.UUID): return False - async def get_org_storage_quota(self, oid): + async def get_this_month_crawl_exec_seconds(self, oid: uuid.UUID) -> int: + """Return crawlExecSeconds for current month""" + org = await self.orgs.find_one({"_id": oid}) + org = Organization.from_dict(org) + yymm = datetime.utcnow().strftime("%Y-%m") + try: + return org.crawlExecSeconds[yymm] + except KeyError: + return 0 + + async def exec_mins_quota_reached(self, oid: uuid.UUID) -> bool: + """Return bools for if execution minutes quota""" + quota = await self.get_org_exec_mins_monthly_quota(oid) + + quota_reached = False + + if quota: + monthly_exec_seconds = await self.get_this_month_crawl_exec_seconds(oid) + monthly_exec_minutes = math.floor(monthly_exec_seconds / 60) + + if monthly_exec_minutes >= quota: + quota_reached = True + + # add additional quotas here + + return quota_reached + + async def get_org_storage_quota(self, oid: uuid.UUID) -> int: """return max allowed concurrent crawls, if any""" org = await self.orgs.find_one({"_id": oid}) if org: @@ -323,6 +351,14 @@ async def get_org_storage_quota(self, oid): return org.quotas.storageQuota return 0 + async def get_org_exec_mins_monthly_quota(self, oid: uuid.UUID) -> int: + """return max allowed execution mins per month, if any""" + org = await self.orgs.find_one({"_id": oid}) + if org: + org = Organization.from_dict(org) + return org.quotas.maxExecMinutesPerMonth + return 0 + async def set_origin(self, org: Organization, request: Request): """Get origin from request and store in db for use in event webhooks""" headers = request.headers @@ -528,7 +564,10 @@ async def create_org( async def get_org( org: Organization = Depends(org_dep), user: User = Depends(user_dep) ): - return await org.serialize_for_user(user, user_manager) + org_out = await org.serialize_for_user(user, user_manager) + org_out.storageQuotaReached = await ops.storage_quota_reached(org.id) + org_out.execMinutesQuotaReached = await ops.exec_mins_quota_reached(org.id) + return org_out @router.post("/rename", tags=["organizations"]) async def rename_org( diff --git a/backend/test_nightly/test_concurrent_crawl_limit.py b/backend/test_nightly/test_concurrent_crawl_limit.py index 7f6ee3d215..141ec97948 100644 --- a/backend/test_nightly/test_concurrent_crawl_limit.py +++ b/backend/test_nightly/test_concurrent_crawl_limit.py @@ -2,6 +2,7 @@ import time from .conftest import API_PREFIX +from .utils import get_crawl_status crawl_id_a = None crawl_id_b = None @@ -103,12 +104,3 @@ def run_crawl(org_id, headers): data = r.json() return data["run_now_job"] - - -def get_crawl_status(org_id, crawl_id, headers): - r = requests.get( - f"{API_PREFIX}/orgs/{org_id}/crawls/{crawl_id}/replay.json", - headers=headers, - ) - data = r.json() - return data["state"] diff --git a/backend/test_nightly/test_execution_minutes_quota.py b/backend/test_nightly/test_execution_minutes_quota.py new file mode 100644 index 0000000000..fcb1d9902c --- /dev/null +++ b/backend/test_nightly/test_execution_minutes_quota.py @@ -0,0 +1,91 @@ +import math +import requests +import time +from datetime import datetime + +from .conftest import API_PREFIX +from .utils import get_crawl_status + + +EXEC_MINS_QUOTA = 1 +EXEC_MINS_ALLOWED_OVERAGE = 10 +EXEC_MINS_HARD_CAP = EXEC_MINS_QUOTA + EXEC_MINS_ALLOWED_OVERAGE + +config_id = None + + +def test_set_execution_mins_quota(org_with_quotas, admin_auth_headers): + r = requests.post( + f"{API_PREFIX}/orgs/{org_with_quotas}/quotas", + headers=admin_auth_headers, + json={"maxExecMinutesPerMonth": EXEC_MINS_QUOTA}, + ) + data = r.json() + assert data.get("updated") == True + + +def test_crawl_stopped_when_quota_reached(org_with_quotas, admin_auth_headers): + # Run crawl + global config_id + crawl_id, config_id = run_crawl(org_with_quotas, admin_auth_headers) + time.sleep(1) + + while get_crawl_status(org_with_quotas, crawl_id, admin_auth_headers) in ( + "starting", + "waiting_capacity", + ): + time.sleep(2) + + while get_crawl_status(org_with_quotas, crawl_id, admin_auth_headers) in ( + "running", + "generate-wacz", + "uploading-wacz", + "pending-wait", + ): + time.sleep(2) + + # Ensure that crawl was stopped by quota + assert ( + get_crawl_status(org_with_quotas, crawl_id, admin_auth_headers) + == "partial_complete" + ) + + time.sleep(5) + + # Ensure crawl execution seconds went over quota + r = requests.get( + f"{API_PREFIX}/orgs/{org_with_quotas}/crawls/{crawl_id}/replay.json", + headers=admin_auth_headers, + ) + data = r.json() + execution_seconds = data["crawlExecSeconds"] + assert math.floor(execution_seconds / 60) >= EXEC_MINS_QUOTA + + time.sleep(5) + + # Ensure we can't start another crawl when over the quota + r = requests.post( + f"{API_PREFIX}/orgs/{org_with_quotas}/crawlconfigs/{config_id}/run", + headers=admin_auth_headers, + ) + assert r.status_code == 403 + assert r.json()["detail"] == "exec_minutes_quota_reached" + + +def run_crawl(org_id, headers): + crawl_data = { + "runNow": True, + "name": "Execution Mins Quota", + "config": { + "seeds": [{"url": "https://webrecorder.net/"}], + "extraHops": 1, + }, + } + r = requests.post( + f"{API_PREFIX}/orgs/{org_id}/crawlconfigs/", + headers=headers, + json=crawl_data, + ) + data = r.json() + + return data["run_now_job"], data["id"] diff --git a/backend/test_nightly/utils.py b/backend/test_nightly/utils.py new file mode 100644 index 0000000000..3b94a78c1e --- /dev/null +++ b/backend/test_nightly/utils.py @@ -0,0 +1,14 @@ +"""nightly test utils""" + +import requests + +from .conftest import API_PREFIX + + +def get_crawl_status(org_id, crawl_id, headers): + r = requests.get( + f"{API_PREFIX}/orgs/{org_id}/crawls/{crawl_id}/replay.json", + headers=headers, + ) + data = r.json() + return data["state"] diff --git a/docs/user-guide/org-settings.md b/docs/user-guide/org-settings.md index 41f71e8e0c..0abb5a8770 100644 --- a/docs/user-guide/org-settings.md +++ b/docs/user-guide/org-settings.md @@ -24,3 +24,7 @@ Sent invites can be invalidated by pressing the trash button in the relevant _Pe `Admin` : Users with the administrator role have full access to the organization, including its settings page. + +## Limits + +This page lets organization admins set an additional number of allowed overage minutes when the organization's monthly execution minutes quota has been reached. If set, this serves as a hard cap after which all running crawls will be stopped. When set at the default of 0, crawls will be stopped as soon as the monthly quota is reached. diff --git a/frontend/src/components/orgs-list.ts b/frontend/src/components/orgs-list.ts index f1bf92ebd4..1c3afa87a0 100644 --- a/frontend/src/components/orgs-list.ts +++ b/frontend/src/components/orgs-list.ts @@ -61,6 +61,9 @@ export class OrgsList extends LiteElement { label = msg("Org Storage Quota (GB)"); value = Math.floor(value / 1e9); break; + case "maxExecMinutesPerMonth": + label = msg("Max Execution Minutes Per Month"); + break; default: label = msg("Unlabeled"); } diff --git a/frontend/src/pages/org/crawl-detail.ts b/frontend/src/pages/org/crawl-detail.ts index 83835cacfd..8632ca0e76 100644 --- a/frontend/src/pages/org/crawl-detail.ts +++ b/frontend/src/pages/org/crawl-detail.ts @@ -4,6 +4,7 @@ import { when } from "lit/directives/when.js"; import { ifDefined } from "lit/directives/if-defined.js"; import { classMap } from "lit/directives/class-map.js"; import { msg, localized, str } from "@lit/localize"; +import humanizeDuration from "pretty-ms"; import type { PageChangeEvent } from "../../components/pagination"; import { RelativeDuration } from "../../components/relative-duration"; @@ -642,6 +643,15 @@ export class CrawlDetail extends LiteElement { `} + + ${this.crawl!.finished + ? html`${humanizeDuration( + this.crawl!.crawlExecSeconds * 1000 + )}` + : html`${msg("Pending")}`} + ${this.crawl!.manual ? msg( diff --git a/frontend/src/pages/org/dashboard.ts b/frontend/src/pages/org/dashboard.ts index 072bdfd517..75a4030fc8 100644 --- a/frontend/src/pages/org/dashboard.ts +++ b/frontend/src/pages/org/dashboard.ts @@ -49,6 +49,7 @@ export class Dashboard extends LiteElement { crawls: "green", uploads: "sky", browserProfiles: "indigo", + runningTime: "blue", }; willUpdate(changedProperties: PropertyValues) { @@ -179,6 +180,7 @@ export class Dashboard extends LiteElement { ${this.renderCard( msg("Crawling"), (metrics) => html` + ${this.renderCrawlingMeter(metrics)}
${this.renderStat({ value: @@ -336,6 +338,113 @@ export class Dashboard extends LiteElement { `; } + private renderCrawlingMeter(metrics: Metrics) { + let quotaSeconds = 0; + if (this.org!.quotas && this.org!.quotas.maxExecMinutesPerMonth) { + quotaSeconds = this.org!.quotas.maxExecMinutesPerMonth * 60; + } + + let usageSeconds = 0; + const now = new Date(); + if (this.org!.crawlExecSeconds) { + const actualUsage = + this.org!.crawlExecSeconds[ + `${now.getFullYear()}-${now.getUTCMonth() + 1}` + ]; + if (actualUsage) { + usageSeconds = actualUsage; + } + } + + const hasQuota = Boolean(quotaSeconds); + const isReached = hasQuota && usageSeconds >= quotaSeconds; + + if (isReached) { + usageSeconds = quotaSeconds; + } + + const renderBar = (value: number, label: string, color: string) => html` + +
+
${label}
+
+ ${humanizeDuration(value * 1000)} | + ${this.renderPercentage(value / quotaSeconds)} +
+
+
+ `; + return html` +
+ ${when( + isReached, + () => html` +
+ + ${msg("Monthly Execution Minutes Quota Reached")} +
+ `, + () => + hasQuota + ? html` + + ${humanizeDuration((quotaSeconds - usageSeconds) * 1000)} + ${msg("Available")} + + ` + : "" + )} +
+ ${when( + hasQuota, + () => html` +
+ + ${when(usageSeconds, () => + renderBar( + usageSeconds, + msg("Monthly Execution Time Used"), + isReached ? "warning" : this.colors.runningTime + ) + )} +
+ +
+
${msg("Monthly Execution Time Available")}
+
+ ${humanizeDuration((quotaSeconds - usageSeconds) * 1000)} + | + ${this.renderPercentage( + (quotaSeconds - usageSeconds) / quotaSeconds + )} +
+
+
+
+
+ + ${humanizeDuration(usageSeconds * 1000)} + + + ${humanizeDuration(quotaSeconds * 1000)} + +
+
+ ` + )} + `; + } + private renderCard( title: string, renderContent: (metric: Metrics) => TemplateResult, diff --git a/frontend/src/pages/org/index.ts b/frontend/src/pages/org/index.ts index d811076a7d..0c8ef24cd2 100644 --- a/frontend/src/pages/org/index.ts +++ b/frontend/src/pages/org/index.ts @@ -59,6 +59,7 @@ type Params = { collectionTab?: string; itemType?: Crawl["type"]; jobType?: JobType; + settingsTab?: string; new?: ResourceName; }; const defaultTab = "home"; @@ -97,6 +98,12 @@ export class Org extends LiteElement { @state() private showStorageQuotaAlert = false; + @state() + private orgExecutionMinutesQuotaReached = false; + + @state() + private showExecutionMinutesQuotaAlert = false; + @state() private openDialogName?: ResourceName; @@ -163,6 +170,7 @@ export class Org extends LiteElement { try { this.org = await this.getOrg(this.orgId); this.checkStorageQuota(); + this.checkExecutionMinutesQuota(); } catch { // TODO handle 404 this.org = null; @@ -242,7 +250,8 @@ export class Org extends LiteElement { } return html` - ${this.renderStorageAlert()} ${this.renderOrgNavBar()} + ${this.renderStorageAlert()} ${this.renderExecutionMinutesAlert()} + ${this.renderOrgNavBar()}
+
+ + (this.showExecutionMinutesQuotaAlert = false)} + > + + ${msg( + "Your org has reached its monthly execution minutes limit" + )}
+ ${msg( + "To purchase additional monthly execution minutes, contact us to upgrade your plan." + )} +
+
+
+ `; + } + private renderOrgNavBar() { return html`
@@ -460,11 +499,14 @@ export class Org extends LiteElement { .authState=${this.authState!} orgId=${this.orgId!} ?orgStorageQuotaReached=${this.orgStorageQuotaReached} + ?orgExecutionMinutesQuotaReached=${this + .orgExecutionMinutesQuotaReached} workflowId=${workflowId} openDialogName=${this.viewStateData?.dialog} ?isEditing=${isEditing} ?isCrawler=${this.isCrawler} @storage-quota-update=${this.onStorageQuotaUpdate} + @execution-minutes-quota-update=${this.onExecutionMinutesQuotaUpdate} > `; } @@ -480,7 +522,10 @@ export class Org extends LiteElement { .initialWorkflow=${workflow} .initialSeeds=${seeds} jobType=${ifDefined(this.params.jobType)} + ?orgStorageQuotaReached=${this.orgStorageQuotaReached} + ?orgExecutionMinutesQuotaReached=${this.orgExecutionMinutesQuotaReached} @storage-quota-update=${this.onStorageQuotaUpdate} + @execution-minutes-quota-update=${this.onExecutionMinutesQuotaUpdate} @select-new-dialog=${this.onSelectNewDialog} >`; } @@ -489,9 +534,11 @@ export class Org extends LiteElement { .authState=${this.authState!} orgId=${this.orgId!} ?orgStorageQuotaReached=${this.orgStorageQuotaReached} + ?orgExecutionMinutesQuotaReached=${this.orgExecutionMinutesQuotaReached} userId=${this.userInfo!.id} ?isCrawler=${this.isCrawler} @storage-quota-update=${this.onStorageQuotaUpdate} + @execution-minutes-quota-update=${this.onExecutionMinutesQuotaUpdate} @select-new-dialog=${this.onSelectNewDialog} >`; } @@ -554,9 +601,7 @@ export class Org extends LiteElement { private renderOrgSettings() { if (!this.userInfo || !this.org) return; - const activePanel = this.orgPath.includes("/members") - ? "members" - : "information"; + const activePanel = this.params.settingsTab || "information"; const isAddingMember = this.params.hasOwnProperty("invite"); return html` this.org.quotas.storageQuota) { - this.orgStorageQuotaReached = true; - } else { - this.orgStorageQuotaReached = false; - } + this.orgStorageQuotaReached = !!this.org?.storageQuotaReached; + this.showStorageQuotaAlert = this.orgStorageQuotaReached; + } - if (this.orgStorageQuotaReached) { - this.showStorageQuotaAlert = true; - } + checkExecutionMinutesQuota() { + this.orgExecutionMinutesQuotaReached = !!this.org?.execMinutesQuotaReached; + this.showExecutionMinutesQuotaAlert = this.orgExecutionMinutesQuotaReached; } } diff --git a/frontend/src/pages/org/workflow-detail.ts b/frontend/src/pages/org/workflow-detail.ts index c4ad8c5d20..8a78df27ea 100644 --- a/frontend/src/pages/org/workflow-detail.ts +++ b/frontend/src/pages/org/workflow-detail.ts @@ -49,6 +49,9 @@ export class WorkflowDetail extends LiteElement { @property({ type: Boolean }) orgStorageQuotaReached = false; + @property({ type: Boolean }) + orgExecutionMinutesQuotaReached = false; + @property({ type: String }) workflowId!: string; @@ -533,6 +536,9 @@ export class WorkflowDetail extends LiteElement { configId=${this.workflow!.id} orgId=${this.orgId} .authState=${this.authState} + ?orgStorageQuotaReached=${this.orgStorageQuotaReached} + ?orgExecutionMinutesQuotaReached=${this + .orgExecutionMinutesQuotaReached} @reset=${(e: Event) => this.navTo( `${this.orgBasePath}/workflows/crawl/${this.workflow!.id}` @@ -578,14 +584,18 @@ export class WorkflowDetail extends LiteElement { `, () => html` this.runNow()} > @@ -625,7 +635,8 @@ export class WorkflowDetail extends LiteElement { () => html` this.runNow()} > @@ -1023,12 +1034,16 @@ export class WorkflowDetail extends LiteElement { )} this.runNow()} > @@ -1107,13 +1122,17 @@ export class WorkflowDetail extends LiteElement {

this.runNow()} > @@ -1594,6 +1613,10 @@ export class WorkflowDetail extends LiteElement { if (e.isApiError && e.statusCode === 403) { if (e.details === "storage_quota_reached") { message = msg("Your org does not have enough storage to run crawls."); + } else if (e.details === "exec_minutes_quota_reached") { + message = msg( + "Your org has used all of its execution minutes for this month." + ); } else { message = msg("You do not have permission to run crawls."); } diff --git a/frontend/src/pages/org/workflow-editor.ts b/frontend/src/pages/org/workflow-editor.ts index 67b19e7738..26aea10bbe 100644 --- a/frontend/src/pages/org/workflow-editor.ts +++ b/frontend/src/pages/org/workflow-editor.ts @@ -242,6 +242,12 @@ export class CrawlConfigEditor extends LiteElement { @property({ type: Array }) initialSeeds?: Seed[]; + @property({ type: Boolean }) + orgStorageQuotaReached = false; + + @property({ type: Boolean }) + orgExecutionMinutesQuotaReached = false; + @state() private tagOptions: string[] = []; @@ -535,7 +541,10 @@ export class CrawlConfigEditor extends LiteElement { lang: this.initialWorkflow.config.lang, scheduleType: defaultFormState.scheduleType, scheduleFrequency: defaultFormState.scheduleFrequency, - runNow: defaultFormState.runNow, + runNow: + this.orgStorageQuotaReached || this.orgExecutionMinutesQuotaReached + ? false + : defaultFormState.runNow, tags: this.initialWorkflow.tags, autoAddCollections: this.initialWorkflow.autoAddCollections, jobName: this.initialWorkflow.name || defaultFormState.jobName, @@ -864,6 +873,8 @@ export class CrawlConfigEditor extends LiteElement { { this.updateFormState( { @@ -2158,37 +2169,28 @@ https://archiveweb.page/images/${"logo.svg"}`} body: JSON.stringify(config), })); - const crawlId = data.run_now_job; + const crawlId = data.run_now_job || data.started || null; const storageQuotaReached = data.storageQuotaReached; + const executionMinutesQuotaReached = data.execMinutesQuotaReached; - if (crawlId && storageQuotaReached) { - this.notify({ - title: msg("Workflow saved without starting crawl."), - message: msg( - "Could not run crawl with new workflow settings due to storage quota." - ), - variant: "warning", - icon: "exclamation-circle", - duration: 12000, - }); - } else { - let message = msg("Workflow created."); - if (crawlId) { - message = msg("Crawl started with new workflow settings."); - } else if (this.configId) { - message = msg("Workflow updated."); - } - - this.notify({ - message, - variant: "success", - icon: "check2-circle", - }); + let message = msg("Workflow created."); + if (crawlId) { + message = msg("Crawl started with new workflow settings."); + } else if (this.configId) { + message = msg("Workflow updated."); } + this.notify({ + message, + variant: "success", + icon: "check2-circle", + }); + this.navTo( `${this.orgBasePath}/workflows/crawl/${this.configId || data.id}${ - crawlId && !storageQuotaReached ? "#watch" : "" + crawlId && !storageQuotaReached && !executionMinutesQuotaReached + ? "#watch" + : "" }` ); } catch (e: any) { diff --git a/frontend/src/pages/org/workflows-list.ts b/frontend/src/pages/org/workflows-list.ts index d81d212852..14b3db710e 100644 --- a/frontend/src/pages/org/workflows-list.ts +++ b/frontend/src/pages/org/workflows-list.ts @@ -74,6 +74,9 @@ export class WorkflowsList extends LiteElement { @property({ type: Boolean }) orgStorageQuotaReached = false; + @property({ type: Boolean }) + orgExecutionMinutesQuotaReached = false; + @property({ type: String }) userId!: string; @@ -440,7 +443,8 @@ export class WorkflowsList extends LiteElement { () => html` this.runNow(workflow)} > @@ -798,6 +802,10 @@ export class WorkflowsList extends LiteElement { if (e.isApiError && e.statusCode === 403) { if (e.details === "storage_quota_reached") { message = msg("Your org does not have enough storage to run crawls."); + } else if (e.details === "exec_minutes_quota_reached") { + message = msg( + "Your org has used all of its execution minutes for this month." + ); } else { message = msg("You do not have permission to run crawls."); } diff --git a/frontend/src/pages/org/workflows-new.ts b/frontend/src/pages/org/workflows-new.ts index d32d9315df..8a28aaf3a0 100644 --- a/frontend/src/pages/org/workflows-new.ts +++ b/frontend/src/pages/org/workflows-new.ts @@ -56,6 +56,12 @@ export class WorkflowsNew extends LiteElement { @property({ type: String }) jobType?: JobType; + @property({ type: Boolean }) + orgStorageQuotaReached = false; + + @property({ type: Boolean }) + orgExecutionMinutesQuotaReached = false; + // Use custom property accessor to prevent // overriding default Workflow values @property({ type: Object }) @@ -116,6 +122,9 @@ export class WorkflowsNew extends LiteElement { jobType=${jobType} orgId=${this.orgId} .authState=${this.authState} + ?orgStorageQuotaReached=${this.orgStorageQuotaReached} + ?orgExecutionMinutesQuotaReached=${this + .orgExecutionMinutesQuotaReached} @reset=${async (e: Event) => { await (e.target as LitElement).updateComplete; this.dispatchEvent( diff --git a/frontend/src/routes.ts b/frontend/src/routes.ts index 0baf97ed5b..18f7bd4745 100644 --- a/frontend/src/routes.ts +++ b/frontend/src/routes.ts @@ -17,7 +17,7 @@ export const ROUTES = { "(/items(/:itemType(/:itemId)))", "(/collections(/new)(/view/:collectionId(/:collectionTab))(/edit/:collectionId))", "(/browser-profiles(/profile(/browser/:browserId)(/:browserProfileId)))", - "(/settings(/members))", + "(/settings(/:settingsTab))", ].join(""), users: "/users", usersInvite: "/users/invite", diff --git a/frontend/src/types/crawler.ts b/frontend/src/types/crawler.ts index 0a1b28210f..3fa472119e 100644 --- a/frontend/src/types/crawler.ts +++ b/frontend/src/types/crawler.ts @@ -135,6 +135,7 @@ export type Crawl = CrawlConfig & { collectionIds: string[]; collections: { id: string; name: string }[]; type?: "crawl" | "upload" | null; + crawlExecSeconds: number; }; export type Upload = Omit< @@ -146,6 +147,7 @@ export type Upload = Omit< | "stopping" | "firstSeed" | "seedCount" + | "crawlExecSeconds" > & { type: "upload"; }; diff --git a/frontend/src/types/org.ts b/frontend/src/types/org.ts index 0c2c6fd793..586ffb23cc 100644 --- a/frontend/src/types/org.ts +++ b/frontend/src/types/org.ts @@ -22,6 +22,8 @@ export type OrgData = { // Keyed by {4-digit year}-{2-digit month} [key: string]: number; } | null; + storageQuotaReached?: boolean; + execMinutesQuotaReached?: boolean; users?: { [id: string]: { role: (typeof AccessCode)[UserRole]; diff --git a/frontend/src/utils/LiteElement.ts b/frontend/src/utils/LiteElement.ts index 26adf1db92..1cc6535fbb 100644 --- a/frontend/src/utils/LiteElement.ts +++ b/frontend/src/utils/LiteElement.ts @@ -139,6 +139,7 @@ export default class LiteElement extends LitElement { if (resp.ok) { const body = await resp.json(); const storageQuotaReached = body.storageQuotaReached; + const executionMinutesQuotaReached = body.executionMinutesQuotaReached; if (typeof storageQuotaReached === "boolean") { this.dispatchEvent( new CustomEvent("storage-quota-update", { @@ -147,6 +148,14 @@ export default class LiteElement extends LitElement { }) ); } + if (typeof executionMinutesQuotaReached === "boolean") { + this.dispatchEvent( + new CustomEvent("execution-minutes-quota-update", { + detail: { reached: executionMinutesQuotaReached }, + bubbles: true, + }) + ); + } return body; } @@ -175,6 +184,16 @@ export default class LiteElement extends LitElement { errorMessage = msg("Storage quota reached"); break; } + if (errorDetail === "exec_minutes_quota_reached") { + this.dispatchEvent( + new CustomEvent("execution-minutes-quota-update", { + detail: { reached: true }, + bubbles: true, + }) + ); + errorMessage = msg("Monthly execution minutes quota reached"); + break; + } } case 404: { errorMessage = msg("Not found");