Skip to content

Commit

Permalink
feat: download file & file_bundle modules
Browse files Browse the repository at this point in the history
  • Loading branch information
makkus committed Mar 5, 2024
1 parent ced5fdc commit 44db071
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 5 deletions.
194 changes: 194 additions & 0 deletions src/kiara_plugin/onboarding/modules/download/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# -*- coding: utf-8 -*-
import atexit
import os
import shutil
import tempfile
from typing import Any, Dict, Union

from pydantic import Field

from kiara.exceptions import KiaraProcessingException
from kiara.models.module import KiaraModuleConfig
from kiara.models.values.value import ValueMap
from kiara.modules import KiaraModule, ValueMapSchema


class DownloadFileConfig(KiaraModuleConfig):
attach_metadata: bool = Field(
description="Whether to attach the download metadata to the result file.",
default=True,
)


class DownloadFileModule(KiaraModule):
"""Download a single file from a remote location.
The result of this operation is a single value of type 'file' (basically an array of raw bytes + some light metadata), which can then be used in other modules to create more meaningful data structures.
"""

_module_type_name = "download.file"
_config_cls = DownloadFileConfig

def create_inputs_schema(self) -> ValueMapSchema:

result: Dict[str, Dict[str, Any]] = {
"url": {"type": "string", "doc": "The url of the file to download."},
"file_name": {
"type": "string",
"doc": "The file name to use for the downloaded file, if not provided it will be generated from the last token of the url.",
"optional": True,
},
}
return result

def create_outputs_schema(
self,
) -> ValueMapSchema:

result: Dict[str, Dict[str, Any]] = {
"file": {
"type": "file",
"doc": "The downloaded file.",
}
}

return result

def process(self, inputs: ValueMap, outputs: ValueMap):

from kiara_plugin.onboarding.utils.download import download_file

url = inputs.get_value_data("url")
file_name = inputs.get_value_data("file_name")

result_file = download_file(
url=url,
file_name=file_name,
attach_metadata=self.get_config_value("attach_metadata"),
)

outputs.set_value("file", result_file)


class DownloadFileBundleConfig(KiaraModuleConfig):
attach_metadata_to_bundle: bool = Field(
description="Whether to attach the download metadata to the result file bundle instance.",
default=True,
)
attach_metadata_to_files: bool = Field(
description="Whether to attach the download metadata to each file in the resulting bundle.",
default=False,
)


class DownloadFileBundleModule(KiaraModule):
"""Download a file bundle from a remote location.
This is basically just a convenience module that incorporates unpacking of the downloaded file into a folder structure, and then wrapping it into a *kiara* `file_bundle` data type.
If the `sub_path` input is set, the whole data is downloaded anyway, but before wrapping into a `file_bundle` value, the files not in the sub-path are ignored (and thus not available later on). Make sure you
decided whether this is ok for your use-case, if not, rather filter the `file_bundle` later in an
extra step (for example using the `file_bundle.pick.sub_folder` operation).
"""

_module_type_name = "download.file_bundle"
_config_cls = DownloadFileBundleConfig

def create_inputs_schema(self) -> ValueMapSchema:

result: Dict[str, Dict[str, Any]] = {
"url": {
"type": "string",
"doc": "The url of an archive/zip file to download.",
},
"sub_path": {
"type": "string",
"doc": "A relative path to select only a sub-folder from the archive.",
"optional": True,
},
}

return result

def create_outputs_schema(
self,
) -> ValueMapSchema:

result: Dict[str, Dict[str, Any]] = {
"file_bundle": {
"type": "file_bundle",
"doc": "The downloaded file bundle.",
}
}

return result

def process(self, inputs: ValueMap, outputs: ValueMap):

from urllib.parse import urlparse

from kiara.models.filesystem import KiaraFile, KiaraFileBundle
from kiara_plugin.onboarding.utils.download import download_file

url = inputs.get_value_data("url")
suffix = None
try:
parsed_url = urlparse(url)
_, suffix = os.path.splitext(parsed_url.path)
except Exception:
pass
if not suffix:
suffix = ""

sub_path: Union[None, str] = inputs.get_value_data("sub_path")
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=suffix)
atexit.register(tmp_file.close)

kiara_file: KiaraFile
file_hash: int

kiara_file, file_hash = download_file(
url, target=tmp_file.name, attach_metadata=True, return_md5_hash=True
)

assert kiara_file.path == tmp_file.name

out_dir = tempfile.mkdtemp()

def del_out_dir():
shutil.rmtree(out_dir, ignore_errors=True)

atexit.register(del_out_dir)

error = None
try:
shutil.unpack_archive(tmp_file.name, out_dir)
except Exception:
# try patool, maybe we're lucky
try:
import patoolib

patoolib.extract_archive(tmp_file.name, outdir=out_dir)
except Exception as e:
error = e

if error is not None:
raise KiaraProcessingException(f"Could not extract archive: {error}.")

path = out_dir
if sub_path:
path = os.path.join(out_dir, sub_path)
bundle = KiaraFileBundle.import_folder(path)

attach_metadata_to_bundle = self.get_config_value("attach_metadata_to_bundle")
if attach_metadata_to_bundle:
metadata = kiara_file.metadata["download_info"]
bundle.metadata["download_info"] = metadata

attach_metadata_to_files = self.get_config_value("attach_metadata_to_files")
if attach_metadata_to_files or True:
metadata = kiara_file.metadata["download_info"]
for kf in bundle.included_files.values():
kf.metadata["download_info"] = metadata

outputs.set_value("file_bundle", bundle)
17 changes: 12 additions & 5 deletions src/kiara_plugin/onboarding/utils/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from kiara.exceptions import KiaraException
from kiara.models.filesystem import FolderImportConfig, KiaraFile, KiaraFileBundle
from kiara.utils.dates import get_current_time_incl_timezone
from kiara.utils.files import unpack_archive
from kiara.utils.json import orjson_dumps
from kiara_plugin.onboarding.models import OnboardDataModel
Expand All @@ -21,7 +22,10 @@ class DownloadMetadata(BaseModel):
response_headers: List[Dict[str, str]] = Field(
description="The response headers of the download request."
)
request_time: str = Field(description="The time the request was made.")
request_time: datetime = Field(description="The time the request was made.")
download_time_in_seconds: float = Field(
description="How long the download took in seconds."
)


class DownloadBundleMetadata(DownloadMetadata):
Expand Down Expand Up @@ -56,7 +60,6 @@ def download_file(
import hashlib

import httpx
import pytz

if not file_name:
# TODO: make this smarter, using content-disposition headers if available
Expand All @@ -79,7 +82,9 @@ def rm_tmp_file():
hash_md5 = hashlib.md5() # noqa

history = []
datetime.utcnow().replace(tzinfo=pytz.utc)

request_time = get_current_time_incl_timezone()

with open(_target, "wb") as f:
with httpx.stream("GET", url, follow_redirects=True) as r:
if r.status_code < 200 or r.status_code >= 399:
Expand All @@ -95,12 +100,14 @@ def rm_tmp_file():
f.write(data)

result_file = KiaraFile.load_file(_target.as_posix(), file_name)

now_time = get_current_time_incl_timezone()
delta = (now_time - request_time).total_seconds()
if attach_metadata:
metadata = {
"url": url,
"response_headers": history,
"request_time": datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),
"request_time": request_time,
"download_time_in_seconds": delta,
}
_metadata: DownloadMetadata = DownloadMetadata(**metadata)
result_file.metadata["download_info"] = _metadata.model_dump()
Expand Down

0 comments on commit 44db071

Please sign in to comment.