Skip to content

Commit

Permalink
Merge pull request #48 from CABLE-LSM/45-automatically-attach-uploade…
Browse files Browse the repository at this point in the history
…d-files-to-model-output

45 automatically attach uploaded files to model output
  • Loading branch information
bschroeter authored Aug 14, 2024
2 parents 3afb9d7 + f85fe81 commit 8347b8a
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 11 deletions.
1 change: 1 addition & 0 deletions .conda/meorg_client_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ dependencies:
- pytest
- black
- ruff
- pandas>=2.2.2
- pip:
- -r mkdocs-requirements.txt
1 change: 1 addition & 0 deletions .conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ requirements:
- requests >=2.31.0
- click >=8.1.7
- PyYAML >=6.0.1
- pandas >=2.2.2

test:
imports:
Expand Down
29 changes: 23 additions & 6 deletions meorg_client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,27 +109,44 @@ def list_endpoints():

@click.command("upload")
@click.argument("file_path", nargs=-1)
def file_upload(file_path: tuple):
@click.option(
"--attach_to",
default=None,
help="Supply a model output id to immediately attach the file to.",
)
def file_upload(file_path, attach_to=None):
"""
Upload a file to the server.
Prints Job ID on success, which is used by file-status to check transfer status.
If attach_to is used then no ID is returned.
"""
client = _get_client()

# Upload the file, get the job ID
response = _call(client.upload_files, files=list(file_path))
files = response.get("data").get("files")
for f in files:
click.echo(f.get("file"))
response = _call(client.upload_files, files=list(file_path), attach_to=attach_to)

# Different logic if we are attaching to a model output immediately
if not attach_to:
files = response.get("data").get("files")
for f in files:
click.echo(f.get("file"))
else:
click.echo("SUCCESS")


@click.command("upload_parallel")
@click.argument("file_paths", nargs=-1)
@click.option(
"-n", default=2, help="Number of simultaneous parallel uploads (default=2)."
)
def file_upload_parallel(file_paths: tuple, n: int = 2):
@click.option(
"--attach_to",
default=None,
help="Supply a model output id to immediately attach the file to.",
)
def file_upload_parallel(file_paths: tuple, n: int = 2, attach_to: str = None):
"""Upload files in parallel.
Parameters
Expand Down
23 changes: 19 additions & 4 deletions meorg_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import meorg_client.endpoints as endpoints
import meorg_client.exceptions as mx
import meorg_client.utilities as mu
import meorg_client.parallel as meop
import mimetypes as mt
from pathlib import Path
from multiprocessing import Pool


class Client:
Expand Down Expand Up @@ -217,7 +217,9 @@ def logout(self):
self.headers.pop("X-User-Id", None)
self.headers.pop("X-Auth-Token", None)

def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2):
def upload_files_parallel(
self, files: Union[str, Path, list], n: int = 2, attach_to: str = None
):
"""Upload files in parallel.
Parameters
Expand All @@ -226,6 +228,8 @@ def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2):
A path to a file, or a list of paths.
n : int, optional
Number of threads to use, by default 2.
attach_to : str, optional
Module output id to attach to, by default None.
Returns
-------
Expand All @@ -242,21 +246,25 @@ def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2):

# Do the parallel upload
responses = None
with Pool(processes=n) as pool:
responses = pool.map(self.upload_files, files)
responses = meop.parallelise(
self.upload_files, n, files=files, attach_to=attach_to
)

return responses

def upload_files(
self,
files: Union[str, Path],
attach_to: str = None,
) -> Union[dict, requests.Response]:
"""Upload a file.
Parameters
----------
files : path-like, list
Path to the file, or a list containing paths.
attach_to : str, optional
Optional model_output_id to attach the files to, by default None
Returns
-------
Expand Down Expand Up @@ -309,6 +317,13 @@ def upload_files(
for fd in payload:
fd[1][1].close()

# Automatically attach to a model output
if attach_to:

_ = self.attach_files_to_model_output(
attach_to, files=mu.get_uploaded_file_ids(response)
)

return response

def list_files(self, id: str) -> Union[dict, requests.Response]:
Expand Down
66 changes: 66 additions & 0 deletions meorg_client/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Methods for parallel execution."""

import pandas as pd
import multiprocessing as mp


def _execute(mp_args: tuple):
"""Execute an instance of the parallel function.
Parameters
----------
mp_args : tuple
2-tuple consisting of a callable and an arguments dictionary.
Returns
-------
mixed
Returning value of the callable.
"""
return mp_args[0](**mp_args[1])


def _convert_kwargs(**kwargs):
"""Convert a dict of lists and scalars into even lists for parallel execution.
Returns
-------
dict
A dictionary of lists of arguments.
"""
return pd.DataFrame(kwargs).to_dict("records")


def parallelise(func: callable, num_threads: int, **kwargs):
"""Execute `func` in parallel over `num_threads`.
Parameters
----------
func : callable
Function to parallelise.
num_threads : int
Number of threads.
**kwargs :
Keyword arguments for `func` all lists must have equal length, scalars will be converted to lists.
Returns
-------
mixed
Returning value of `func`.
"""

# Convert the kwargs to argument list of dicts
mp_args = _convert_kwargs(**kwargs)

# Attach the function pointer as the first argument
mp_args = [[func, mp_arg] for mp_arg in mp_args]

# Start with empty results
results = None

# Establish a pool of workers (blocking)
with mp.Pool(processes=num_threads) as pool:
results = pool.map(_execute, mp_args)

# Return the results
return results
19 changes: 19 additions & 0 deletions meorg_client/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,22 @@ def test_file_attach(runner):
)

assert result.exit_code == 0


def test_file_upload_with_attach(runner, test_filepath):
"""Test file upload with attachment via CLI."""
model_output_id = store.get("model_output_id")
result = runner.invoke(
cli.file_upload, [test_filepath, test_filepath, "--attach_to", model_output_id]
)
assert result.exit_code == 0


def test_file_upload_parallel_with_attach(runner, test_filepath):
"""Test file upload with attachment via CLI."""
model_output_id = store.get("model_output_id")
result = runner.invoke(
cli.file_upload_parallel,
[test_filepath, test_filepath, "--attach_to", model_output_id],
)
assert result.exit_code == 0
24 changes: 24 additions & 0 deletions meorg_client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ def _get_authenticated_client() -> Client:
return client


def _get_test_file():
return os.path.join(mu.get_installed_data_root(), "test/test.txt")


@pytest.fixture
def client() -> Client:
return _get_authenticated_client()
Expand Down Expand Up @@ -151,6 +155,13 @@ def test_upload_file_large(client: Client):
assert client.success()


def test_upload_files_with_attach(client: Client):
"""Test that the upload can also attach in the same method call."""
filepath = _get_test_file()
_ = client.upload_files([filepath, filepath], attach_to=client._model_output_id)
assert client.success()


def test_upload_file_parallel(client: Client, test_filepath: str):
"""Test the uploading of a file."""
# Upload the file
Expand All @@ -162,6 +173,19 @@ def test_upload_file_parallel(client: Client, test_filepath: str):
)


def test_upload_file_parallel_with_attach(client: Client, test_filepath: str):
"""Test the uploading of a file with a model output ID to attach."""
# Upload the file
responses = client.upload_files_parallel(
[test_filepath, test_filepath], n=2, attach_to=client._model_output_id
)

# Make sure it worked
assert all(
[response.get("data").get("files")[0].get("file") for response in responses]
)


def test_logout(client: Client):
"""Test logout."""
client.logout()
Expand Down
17 changes: 17 additions & 0 deletions meorg_client/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,20 @@ def ensure_list(obj):
The object as a list, if it is not already.
"""
return obj if isinstance(obj, list) else [obj]


def get_uploaded_file_ids(response):
"""Get the file ids out of the response object.
Parameters
----------
response : dict
Response dictionary from a upload call.
Returns
-------
list
List of file ids.
"""
file_ids = [f.get("file") for f in response.get("data").get("files")]
return file_ids
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ dependencies = [
"requests>=2.31.0",
"requests-mock>=1.11.0",
"PyYAML>=6.0.1",
"click>=8.1.7"
"click>=8.1.7",
"pandas>=2.2.2"
]

authors = [
Expand Down

0 comments on commit 8347b8a

Please sign in to comment.