Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Public collections #2271

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 62 additions & 5 deletions backend/btrixcloud/background_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
PaginatedBackgroundJobResponse,
AnyJob,
StorageRef,
Expand Down Expand Up @@ -301,8 +302,6 @@ async def create_delete_org_job(
try:
job_id = await self.crawl_manager.run_delete_org_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
Expand Down Expand Up @@ -346,8 +345,6 @@ async def create_recalculate_org_stats_job(
try:
job_id = await self.crawl_manager.run_recalculate_org_stats_job(
oid=str(org.id),
backend_image=os.environ.get("BACKEND_IMAGE", ""),
pull_policy=os.environ.get("BACKEND_IMAGE_PULL_POLICY", ""),
existing_job_id=existing_job_id,
)
if existing_job_id:
Expand Down Expand Up @@ -381,6 +378,52 @@ async def create_recalculate_org_stats_job(
print(f"warning: recalculate org stats job could not be started: {exc}")
return None

async def create_re_add_org_pages_job(
self,
oid: UUID,
crawl_type: Optional[str] = None,
existing_job_id: Optional[str] = None,
):
"""Create job to (re)add all pages in an org, optionally filtered by crawl type"""

try:
job_id = await self.crawl_manager.run_re_add_org_pages_job(
oid=str(oid),
crawl_type=crawl_type,
existing_job_id=existing_job_id,
)
if existing_job_id:
readd_pages_job = await self.get_background_job(existing_job_id, oid)
previous_attempt = {
"started": readd_pages_job.started,
"finished": readd_pages_job.finished,
}
if readd_pages_job.previousAttempts:
readd_pages_job.previousAttempts.append(previous_attempt)
else:
readd_pages_job.previousAttempts = [previous_attempt]
readd_pages_job.started = dt_now()
readd_pages_job.finished = None
readd_pages_job.success = None
else:
readd_pages_job = ReAddOrgPagesJob(
id=job_id,
oid=oid,
crawl_type=crawl_type,
started=dt_now(),
)

await self.jobs.find_one_and_update(
{"_id": job_id}, {"$set": readd_pages_job.to_dict()}, upsert=True
)

return job_id
# pylint: disable=broad-exception-caught
except Exception as exc:
# pylint: disable=raise-missing-from
print(f"warning: re-add org pages job could not be started: {exc}")
return None

async def job_finished(
self,
job_id: str,
Expand Down Expand Up @@ -430,7 +473,11 @@ async def job_finished(
async def get_background_job(
self, job_id: str, oid: Optional[UUID] = None
) -> Union[
CreateReplicaJob, DeleteReplicaJob, DeleteOrgJob, RecalculateOrgStatsJob
CreateReplicaJob,
DeleteReplicaJob,
DeleteOrgJob,
RecalculateOrgStatsJob,
ReAddOrgPagesJob,
]:
"""Get background job"""
query: dict[str, object] = {"_id": job_id}
Expand All @@ -454,6 +501,9 @@ def _get_job_by_type_from_data(self, data: dict[str, object]):
if data["type"] == BgJobType.RECALCULATE_ORG_STATS:
return RecalculateOrgStatsJob.from_dict(data)

if data["type"] == BgJobType.READD_ORG_PAGES:
return ReAddOrgPagesJob.from_dict(data)

return DeleteOrgJob.from_dict(data)

async def list_background_jobs(
Expand Down Expand Up @@ -595,6 +645,13 @@ async def retry_background_job(
existing_job_id=job_id,
)

if job.type == BgJobType.READD_ORG_PAGES:
await self.create_re_add_org_pages_job(
org.id,
job.crawl_type,
existing_job_id=job_id,
)

return {"success": True}

async def retry_failed_background_jobs(
Expand Down
24 changes: 6 additions & 18 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
""" base crawl type """

import os
from datetime import timedelta
from typing import Optional, List, Union, Dict, Any, Type, TYPE_CHECKING, cast, Tuple
from uuid import UUID
Expand Down Expand Up @@ -29,6 +28,7 @@
UpdatedResponse,
DeletedResponseQuota,
CrawlSearchValuesResponse,
PRESIGN_DURATION_SECONDS,
)
from .pagination import paginated_format, DEFAULT_PAGE_SIZE
from .utils import dt_now, date_to_str
Expand All @@ -47,11 +47,6 @@
CrawlConfigOps = UserManager = OrgOps = CollectionOps = PageOps = object
StorageOps = EventWebhookOps = BackgroundJobOps = object

# Presign duration must be less than 604800 seconds (one week),
# so set this one minute short of a week.
PRESIGN_MINUTES_MAX = 10079
PRESIGN_MINUTES_DEFAULT = PRESIGN_MINUTES_MAX


# ============================================================================
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines
Expand Down Expand Up @@ -93,16 +88,8 @@ def __init__(
self.background_job_ops = background_job_ops
self.page_ops = cast(PageOps, None)

presign_duration_minutes = int(
os.environ.get("PRESIGN_DURATION_MINUTES") or PRESIGN_MINUTES_DEFAULT
)

self.presign_duration_seconds = (
min(presign_duration_minutes, PRESIGN_MINUTES_MAX) * 60
)

# renew when <25% of time remaining
self.expire_at_duration_seconds = int(self.presign_duration_seconds * 0.75)
self.expire_at_duration_seconds = int(PRESIGN_DURATION_SECONDS * 0.75)
ikreymer marked this conversation as resolved.
Show resolved Hide resolved

def set_page_ops(self, page_ops):
"""set page ops reference"""
Expand Down Expand Up @@ -336,8 +323,9 @@ async def delete_crawls(
status_code=400, detail=f"Error Stopping Crawl: {exc}"
)

await self.page_ops.delete_crawl_pages(crawl_id, org.id)

if type_ == "crawl":
await self.page_ops.delete_crawl_pages(crawl_id, org.id)
await self.delete_all_crawl_qa_files(crawl_id, org)

crawl_size = await self._delete_crawl_files(crawl, org)
Expand Down Expand Up @@ -382,7 +370,7 @@ async def _delete_crawl_files(
size = 0
for file_ in crawl.files:
size += file_.size
if not await self.storage_ops.delete_crawl_file_object(org, file_):
if not await self.storage_ops.delete_file_object(org, file_):
raise HTTPException(status_code=400, detail="file_deletion_error")
# Not replicating QA run WACZs yet
if not isinstance(crawl, QARun):
Expand Down Expand Up @@ -474,7 +462,7 @@ async def resolve_signed_urls(
):
exp = now + delta
presigned_url = await self.storage_ops.get_presigned_url(
org, file_, self.presign_duration_seconds
org, file_, PRESIGN_DURATION_SECONDS
)

prefix = "files"
Expand Down
Loading
Loading