From 36be3c517a39e0ddb92f3e6c7e5a70fd44829681 Mon Sep 17 00:00:00 2001 From: Ben Schroeter Date: Thu, 8 Aug 2024 15:50:23 +1000 Subject: [PATCH 1/2] Attach with upload. Fixes #45 --- meorg_client/cli.py | 22 +++++++++++++---- meorg_client/client.py | 40 +++++++++++++++++++++++++++++++ meorg_client/tests/test_cli.py | 10 ++++++++ meorg_client/tests/test_client.py | 15 ++++++++++-- meorg_client/utilities.py | 17 +++++++++++++ 5 files changed, 97 insertions(+), 7 deletions(-) diff --git a/meorg_client/cli.py b/meorg_client/cli.py index aff5fcc..d214c00 100644 --- a/meorg_client/cli.py +++ b/meorg_client/cli.py @@ -109,19 +109,31 @@ def list_endpoints(): @click.command("upload") @click.argument("file_path", nargs=-1) -def file_upload(file_path): +@click.option( + "--attach_to", + default=None, + help="Supply a model output id to immediately attach the file to.", +) +def file_upload(file_path, attach_to=None): """ Upload a file to the server. Prints Job ID on success, which is used by file-status to check transfer status. + + If attach_to is used then no ID is returned. """ client = _get_client() # Upload the file, get the job ID - response = _call(client.upload_files, files=list(file_path)) - files = response.get("data").get("files") - for f in files: - click.echo(f.get("file")) + response = _call(client.upload_files, files=list(file_path), attach_to=attach_to) + + # Different logic if we are attaching to a model output immediately + if not attach_to: + files = response.get("data").get("files") + for f in files: + click.echo(f.get("file")) + else: + click.echo("SUCCESS") @click.command("list") diff --git a/meorg_client/client.py b/meorg_client/client.py index 573d45e..718d75a 100644 --- a/meorg_client/client.py +++ b/meorg_client/client.py @@ -12,6 +12,7 @@ import meorg_client.utilities as mu import mimetypes as mt from pathlib import Path +from multiprocessing import Pool class Client: @@ -216,9 +217,40 @@ def logout(self): self.headers.pop("X-User-Id", None) self.headers.pop("X-Auth-Token", None) + def upload_files_parallel(self, files: list, n: int = 2): + """Upload files in parallel. + + Parameters + ---------- + files : list + List of file paths. + n : int, optional + Number of threads to use, by default 2 + + Returns + ------- + list + List of dicts or response objects from upload_files. + """ + + # Ensure the object is actually iterable + files = mu.ensure_list(files) + + # Sequential case, single file provided + if len(files) == 1: + return self.upload_files(files) + + # Do the parallel upload + responses = None + with Pool(processes=n) as pool: + responses = pool.map(self.upload_files, files) + + return responses + def upload_files( self, files: Union[str, Path], + attach_to: str = None, ) -> Union[dict, requests.Response]: """Upload a file. @@ -226,6 +258,8 @@ def upload_files( ---------- files : path-like, list Path to the file, or a list containing paths. + attach_to : str, optional + Optional model_output_id to attach the files to, by default None Returns ------- @@ -278,6 +312,12 @@ def upload_files( for fd in payload: fd[1][1].close() + # Automatically attach to a model output + if attach_to: + response = self.attach_files_to_model_output( + attach_to, files=mu.get_uploaded_file_ids(response) + ) + return response def list_files(self, id: str) -> Union[dict, requests.Response]: diff --git a/meorg_client/tests/test_cli.py b/meorg_client/tests/test_cli.py index bd7edc0..7a11f8c 100644 --- a/meorg_client/tests/test_cli.py +++ b/meorg_client/tests/test_cli.py @@ -62,3 +62,13 @@ def test_file_attach(runner): ) assert result.exit_code == 0 + + +def test_file_upload_with_attach(runner): + """Test file upload with attachment via CLI.""" + filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") + model_output_id = store.get("model_output_id") + result = runner.invoke( + cli.file_upload, [filepath, filepath, "--attach_to", model_output_id] + ) + assert result.exit_code == 0 diff --git a/meorg_client/tests/test_client.py b/meorg_client/tests/test_client.py index e412d73..ab08b39 100644 --- a/meorg_client/tests/test_client.py +++ b/meorg_client/tests/test_client.py @@ -40,6 +40,10 @@ def _get_authenticated_client(): return client +def _get_test_file(): + return os.path.join(mu.get_installed_data_root(), "test/test.txt") + + @pytest.fixture def client(): return _get_authenticated_client() @@ -61,7 +65,7 @@ def test_list_endpoints(client): def test_upload_file(client): """Test the uploading of a file.""" # Upload the file. - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") + filepath = _get_test_file() # Upload the file response = client.upload_files(filepath) @@ -89,7 +93,7 @@ def test_upload_file_multiple(client): def test_file_list(client): - """Test the listinf of files for a model output.""" + """Test the list of files for a model output.""" response = client.list_files(client._model_output_id) assert client.success() assert isinstance(response.get("data").get("files"), list) @@ -144,6 +148,13 @@ def test_upload_file_large(client): assert client.success() +def test_upload_files_with_attach(client): + """Test that the upload can also attach in the same method call.""" + filepath = _get_test_file() + _ = client.upload_files([filepath, filepath], attach_to=client._model_output_id) + assert client.success() + + def test_logout(client): """Test logout.""" client.logout() diff --git a/meorg_client/utilities.py b/meorg_client/utilities.py index 9bd1336..302803d 100644 --- a/meorg_client/utilities.py +++ b/meorg_client/utilities.py @@ -84,3 +84,20 @@ def ensure_list(obj): The object as a list, if it is not already. """ return obj if isinstance(obj, list) else [obj] + + +def get_uploaded_file_ids(response): + """Get the file ids out of the response object. + + Parameters + ---------- + response : dict + Response dictionary from a upload call. + + Returns + ------- + list + List of file ids. + """ + file_ids = [f.get("file") for f in response.get("data").get("files")] + return file_ids From f85fe810e880e36bc746c50a2135e3b3f92de1f7 Mon Sep 17 00:00:00 2001 From: Ben Schroeter Date: Wed, 14 Aug 2024 13:40:33 +1000 Subject: [PATCH 2/2] Outputting file ids with attachment, parallel upload now takes an attach_to option. Fixes #45. --- .conda/meorg_client_dev.yaml | 1 + .conda/meta.yaml | 1 + meorg_client/cli.py | 7 +++- meorg_client/client.py | 16 +++++--- meorg_client/parallel.py | 66 +++++++++++++++++++++++++++++++ meorg_client/tests/test_cli.py | 15 +++++-- meorg_client/tests/test_client.py | 13 ++++++ pyproject.toml | 3 +- 8 files changed, 112 insertions(+), 10 deletions(-) create mode 100644 meorg_client/parallel.py diff --git a/.conda/meorg_client_dev.yaml b/.conda/meorg_client_dev.yaml index 04c31b5..1990255 100644 --- a/.conda/meorg_client_dev.yaml +++ b/.conda/meorg_client_dev.yaml @@ -15,5 +15,6 @@ dependencies: - pytest - black - ruff + - pandas>=2.2.2 - pip: - -r mkdocs-requirements.txt \ No newline at end of file diff --git a/.conda/meta.yaml b/.conda/meta.yaml index ae7a048..7e95724 100644 --- a/.conda/meta.yaml +++ b/.conda/meta.yaml @@ -26,6 +26,7 @@ requirements: - requests >=2.31.0 - click >=8.1.7 - PyYAML >=6.0.1 + - pandas >=2.2.2 test: imports: diff --git a/meorg_client/cli.py b/meorg_client/cli.py index 298cd4d..6ae9267 100644 --- a/meorg_client/cli.py +++ b/meorg_client/cli.py @@ -141,7 +141,12 @@ def file_upload(file_path, attach_to=None): @click.option( "-n", default=2, help="Number of simultaneous parallel uploads (default=2)." ) -def file_upload_parallel(file_paths: tuple, n: int = 2): +@click.option( + "--attach_to", + default=None, + help="Supply a model output id to immediately attach the file to.", +) +def file_upload_parallel(file_paths: tuple, n: int = 2, attach_to: str = None): """Upload files in parallel. Parameters diff --git a/meorg_client/client.py b/meorg_client/client.py index 6ef7fd9..85a7687 100644 --- a/meorg_client/client.py +++ b/meorg_client/client.py @@ -10,9 +10,9 @@ import meorg_client.endpoints as endpoints import meorg_client.exceptions as mx import meorg_client.utilities as mu +import meorg_client.parallel as meop import mimetypes as mt from pathlib import Path -from multiprocessing import Pool class Client: @@ -217,7 +217,9 @@ def logout(self): self.headers.pop("X-User-Id", None) self.headers.pop("X-Auth-Token", None) - def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2): + def upload_files_parallel( + self, files: Union[str, Path, list], n: int = 2, attach_to: str = None + ): """Upload files in parallel. Parameters @@ -226,6 +228,8 @@ def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2): A path to a file, or a list of paths. n : int, optional Number of threads to use, by default 2. + attach_to : str, optional + Module output id to attach to, by default None. Returns ------- @@ -242,8 +246,9 @@ def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2): # Do the parallel upload responses = None - with Pool(processes=n) as pool: - responses = pool.map(self.upload_files, files) + responses = meop.parallelise( + self.upload_files, n, files=files, attach_to=attach_to + ) return responses @@ -314,7 +319,8 @@ def upload_files( # Automatically attach to a model output if attach_to: - response = self.attach_files_to_model_output( + + _ = self.attach_files_to_model_output( attach_to, files=mu.get_uploaded_file_ids(response) ) diff --git a/meorg_client/parallel.py b/meorg_client/parallel.py new file mode 100644 index 0000000..1962e2d --- /dev/null +++ b/meorg_client/parallel.py @@ -0,0 +1,66 @@ +"""Methods for parallel execution.""" + +import pandas as pd +import multiprocessing as mp + + +def _execute(mp_args: tuple): + """Execute an instance of the parallel function. + + Parameters + ---------- + mp_args : tuple + 2-tuple consisting of a callable and an arguments dictionary. + + Returns + ------- + mixed + Returning value of the callable. + """ + return mp_args[0](**mp_args[1]) + + +def _convert_kwargs(**kwargs): + """Convert a dict of lists and scalars into even lists for parallel execution. + + Returns + ------- + dict + A dictionary of lists of arguments. + """ + return pd.DataFrame(kwargs).to_dict("records") + + +def parallelise(func: callable, num_threads: int, **kwargs): + """Execute `func` in parallel over `num_threads`. + + Parameters + ---------- + func : callable + Function to parallelise. + num_threads : int + Number of threads. + **kwargs : + Keyword arguments for `func` all lists must have equal length, scalars will be converted to lists. + + Returns + ------- + mixed + Returning value of `func`. + """ + + # Convert the kwargs to argument list of dicts + mp_args = _convert_kwargs(**kwargs) + + # Attach the function pointer as the first argument + mp_args = [[func, mp_arg] for mp_arg in mp_args] + + # Start with empty results + results = None + + # Establish a pool of workers (blocking) + with mp.Pool(processes=num_threads) as pool: + results = pool.map(_execute, mp_args) + + # Return the results + return results diff --git a/meorg_client/tests/test_cli.py b/meorg_client/tests/test_cli.py index 774e401..35423c0 100644 --- a/meorg_client/tests/test_cli.py +++ b/meorg_client/tests/test_cli.py @@ -91,11 +91,20 @@ def test_file_attach(runner): assert result.exit_code == 0 -def test_file_upload_with_attach(runner): +def test_file_upload_with_attach(runner, test_filepath): """Test file upload with attachment via CLI.""" - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") model_output_id = store.get("model_output_id") result = runner.invoke( - cli.file_upload, [filepath, filepath, "--attach_to", model_output_id] + cli.file_upload, [test_filepath, test_filepath, "--attach_to", model_output_id] + ) + assert result.exit_code == 0 + + +def test_file_upload_parallel_with_attach(runner, test_filepath): + """Test file upload with attachment via CLI.""" + model_output_id = store.get("model_output_id") + result = runner.invoke( + cli.file_upload_parallel, + [test_filepath, test_filepath, "--attach_to", model_output_id], ) assert result.exit_code == 0 diff --git a/meorg_client/tests/test_client.py b/meorg_client/tests/test_client.py index 7f41615..b2a964a 100644 --- a/meorg_client/tests/test_client.py +++ b/meorg_client/tests/test_client.py @@ -173,6 +173,19 @@ def test_upload_file_parallel(client: Client, test_filepath: str): ) +def test_upload_file_parallel_with_attach(client: Client, test_filepath: str): + """Test the uploading of a file with a model output ID to attach.""" + # Upload the file + responses = client.upload_files_parallel( + [test_filepath, test_filepath], n=2, attach_to=client._model_output_id + ) + + # Make sure it worked + assert all( + [response.get("data").get("files")[0].get("file") for response in responses] + ) + + def test_logout(client: Client): """Test logout.""" client.logout() diff --git a/pyproject.toml b/pyproject.toml index 9678059..d6fc847 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,8 @@ dependencies = [ "requests>=2.31.0", "requests-mock>=1.11.0", "PyYAML>=6.0.1", - "click>=8.1.7" + "click>=8.1.7", + "pandas>=2.2.2" ] authors = [