Skip to content

Commit

Permalink
switch to simpler streaming download + multiwacz metadata improvement…
Browse files Browse the repository at this point in the history
…s: (#1982)

- download via presigned URLs via requests instead of boto APIs, remove boto
- follow-up to #1933 for streaming download improvements
- fixes datapackage.json in multi-wacz to contain the same resources
objects with: `name`, `path`, `hash`, `bytes` to match single WACZ.
- Add additional metadata to multi-wacz datapackage.json, including `type`
(`crawl`, `upload`, `collection`, `qaRun`), `id` (unique id for the
object), `title` / `description` if available (for
crawl/upload/collection), and `crawlId` for `qaRun`
  • Loading branch information
ikreymer authored Oct 3, 2024
1 parent 2429bb6 commit 104ea09
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 56 deletions.
11 changes: 9 additions & 2 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@


# ============================================================================
# pylint: disable=too-many-instance-attributes, too-many-public-methods
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines
class BaseCrawlOps:
"""operations that apply to all crawls"""

Expand Down Expand Up @@ -823,7 +823,14 @@ async def download_crawl_as_single_wacz(self, crawl_id: str, org: Organization):
if not crawl.resources:
raise HTTPException(status_code=400, detail="no_crawl_resources")

resp = await self.storage_ops.download_streaming_wacz(org, crawl.resources)
metadata = {"type": crawl.type, "id": crawl_id, "organization": org.slug}
if crawl.name:
metadata["title"] = crawl.name

if crawl.description:
metadata["description"] = crawl.description

resp = await self.storage_ops.download_streaming_wacz(metadata, crawl.resources)

headers = {"Content-Disposition": f'attachment; filename="{crawl_id}.wacz"'}
return StreamingResponse(
Expand Down
11 changes: 10 additions & 1 deletion backend/btrixcloud/colls.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,16 @@ async def download_collection(self, coll_id: UUID, org: Organization):
"""Download all WACZs in collection as streaming nested WACZ"""
coll = await self.get_collection(coll_id, org, resources=True)

resp = await self.storage_ops.download_streaming_wacz(org, coll.resources)
metadata = {
"type": "collection",
"id": str(coll_id),
"title": coll.name,
"organization": org.slug,
}
if coll.description:
metadata["description"] = coll.description

resp = await self.storage_ops.download_streaming_wacz(metadata, coll.resources)

headers = {"Content-Disposition": f'attachment; filename="{coll.name}.wacz"'}
return StreamingResponse(
Expand Down
11 changes: 10 additions & 1 deletion backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,16 @@ async def download_qa_run_as_single_wacz(
if not qa_run.resources:
raise HTTPException(status_code=400, detail="qa_run_no_resources")

resp = await self.storage_ops.download_streaming_wacz(org, qa_run.resources)
metadata = {
"type": "qaRun",
"id": qa_run_id,
"crawlId": crawl_id,
"organization": org.slug,
}

resp = await self.storage_ops.download_streaming_wacz(
metadata, qa_run.resources
)

finished = qa_run.finished.isoformat()

Expand Down
79 changes: 29 additions & 50 deletions backend/btrixcloud/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
from remotezip import RemoteZip

import aiobotocore.session
import boto3
import requests

from mypy_boto3_s3.client import S3Client
from mypy_boto3_s3.type_defs import CompletedPartTypeDef
from types_aiobotocore_s3 import S3Client as AIOS3Client
from types_aiobotocore_s3.type_defs import CompletedPartTypeDef

from .models import (
BaseFile,
Expand All @@ -52,6 +51,7 @@
)

from .utils import is_bool, slug_from_name
from .version import __version__


if TYPE_CHECKING:
Expand Down Expand Up @@ -289,35 +289,6 @@ async def get_s3_client(
) as client:
yield client, bucket, key

@asynccontextmanager
async def get_sync_client(
self, org: Organization
) -> AsyncIterator[tuple[S3Client, str, str]]:
"""context manager for s3 client"""
storage = self.get_org_primary_storage(org)

endpoint_url = storage.endpoint_url

if not endpoint_url.endswith("/"):
endpoint_url += "/"

parts = urlsplit(endpoint_url)
bucket, key = parts.path[1:].split("/", 1)

endpoint_url = parts.scheme + "://" + parts.netloc

try:
client = boto3.client(
"s3",
region_name=storage.region,
endpoint_url=endpoint_url,
aws_access_key_id=storage.access_key,
aws_secret_access_key=storage.secret_key,
)
yield client, bucket, key
finally:
client.close()

async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
"""Test credentials and storage endpoint by uploading an empty test file"""

Expand Down Expand Up @@ -683,21 +654,32 @@ def _sync_get_filestream(self, wacz_url: str, filename: str) -> Iterator[bytes]:
yield from file_stream

def _sync_dl(
self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str
self, metadata: dict[str, str], all_files: List[CrawlFileOut]
) -> Iterator[bytes]:
"""generate streaming zip as sync"""
for file_ in all_files:
file_.path = file_.name

datapackage = {
"profile": "multi-wacz-package",
"resources": [file_.dict() for file_ in all_files],
"resources": [
{
"name": file_.name,
"path": file_.name,
"hash": "sha256:" + file_.hash,
"bytes": file_.size,
}
for file_ in all_files
],
"software": f"Browsertrix v{__version__}",
**metadata,
}
datapackage_bytes = json.dumps(datapackage).encode("utf-8")
datapackage_bytes = json.dumps(datapackage, indent=2).encode("utf-8")

def get_datapackage() -> Iterable[bytes]:
yield datapackage_bytes

def get_file(name) -> Iterator[bytes]:
response = client.get_object(Bucket=bucket, Key=key + name)
return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE)
def get_file(path: str) -> Iterable[bytes]:
path = self.resolve_internal_access_path(path)
r = requests.get(path, stream=True, timeout=None)
yield from r.iter_content(CHUNK_SIZE)

def member_files() -> (
Iterable[tuple[str, datetime, int, Method, Iterable[bytes]]]
Expand All @@ -710,7 +692,7 @@ def member_files() -> (
modified_at,
perms,
NO_COMPRESSION_64(file_.size, 0),
get_file(file_.name),
get_file(file_.path),
)

yield (
Expand All @@ -720,25 +702,22 @@ def member_files() -> (
NO_COMPRESSION_64(
len(datapackage_bytes), zlib.crc32(datapackage_bytes)
),
(datapackage_bytes,),
get_datapackage(),
)

# stream_zip() is an Iterator but defined as an Iterable, can cast
return cast(Iterator[bytes], stream_zip(member_files(), chunk_size=CHUNK_SIZE))

async def download_streaming_wacz(
self, org: Organization, files: List[CrawlFileOut]
self, metadata: dict[str, str], files: List[CrawlFileOut]
) -> Iterator[bytes]:
"""return an iter for downloading a stream nested wacz file
from list of files"""
async with self.get_sync_client(org) as (client, bucket, key):
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop()

resp = await loop.run_in_executor(
None, self._sync_dl, files, client, bucket, key
)
resp = await loop.run_in_executor(None, self._sync_dl, metadata, files)

return resp
return resp


# ============================================================================
Expand Down
3 changes: 1 addition & 2 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ aiofiles
kubernetes-asyncio==29.0.0
kubernetes
aiobotocore
requests
redis>=5.0.0
pyyaml
jinja2
humanize
python-multipart
pathvalidate
https://github.com/ikreymer/stream-zip/archive/refs/heads/crc32-optional.zip
boto3
backoff>=2.2.1
python-slugify>=8.0.1
mypy_boto3_s3
types_aiobotocore_s3
types-redis
types-python-slugify
Expand Down
10 changes: 10 additions & 0 deletions backend/test/test_run_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import csv
import codecs
import json
from tempfile import TemporaryFile
from zipfile import ZipFile, ZIP_STORED

Expand Down Expand Up @@ -406,6 +407,15 @@ def test_download_wacz_crawls(
assert filename.endswith(".wacz") or filename == "datapackage.json"
assert zip_file.getinfo(filename).compress_type == ZIP_STORED

if filename == "datapackage.json":
data = zip_file.read(filename).decode("utf-8")
datapackage = json.loads(data)
assert len(datapackage["resources"]) == 1
for resource in datapackage["resources"]:
assert resource["name"] == resource["path"]
assert resource["hash"]
assert resource["bytes"]


def test_update_crawl(
admin_auth_headers,
Expand Down

0 comments on commit 104ea09

Please sign in to comment.