diff --git a/.conda/meorg_client_dev.yaml b/.conda/meorg_client_dev.yaml index 04c31b5..1990255 100644 --- a/.conda/meorg_client_dev.yaml +++ b/.conda/meorg_client_dev.yaml @@ -15,5 +15,6 @@ dependencies: - pytest - black - ruff + - pandas>=2.2.2 - pip: - -r mkdocs-requirements.txt \ No newline at end of file diff --git a/.conda/meta.yaml b/.conda/meta.yaml index ae7a048..7e95724 100644 --- a/.conda/meta.yaml +++ b/.conda/meta.yaml @@ -26,6 +26,7 @@ requirements: - requests >=2.31.0 - click >=8.1.7 - PyYAML >=6.0.1 + - pandas >=2.2.2 test: imports: diff --git a/meorg_client/cli.py b/meorg_client/cli.py index ba74525..6ae9267 100644 --- a/meorg_client/cli.py +++ b/meorg_client/cli.py @@ -109,19 +109,31 @@ def list_endpoints(): @click.command("upload") @click.argument("file_path", nargs=-1) -def file_upload(file_path: tuple): +@click.option( + "--attach_to", + default=None, + help="Supply a model output id to immediately attach the file to.", +) +def file_upload(file_path, attach_to=None): """ Upload a file to the server. Prints Job ID on success, which is used by file-status to check transfer status. + + If attach_to is used then no ID is returned. """ client = _get_client() # Upload the file, get the job ID - response = _call(client.upload_files, files=list(file_path)) - files = response.get("data").get("files") - for f in files: - click.echo(f.get("file")) + response = _call(client.upload_files, files=list(file_path), attach_to=attach_to) + + # Different logic if we are attaching to a model output immediately + if not attach_to: + files = response.get("data").get("files") + for f in files: + click.echo(f.get("file")) + else: + click.echo("SUCCESS") @click.command("upload_parallel") @@ -129,7 +141,12 @@ def file_upload(file_path: tuple): @click.option( "-n", default=2, help="Number of simultaneous parallel uploads (default=2)." ) -def file_upload_parallel(file_paths: tuple, n: int = 2): +@click.option( + "--attach_to", + default=None, + help="Supply a model output id to immediately attach the file to.", +) +def file_upload_parallel(file_paths: tuple, n: int = 2, attach_to: str = None): """Upload files in parallel. Parameters diff --git a/meorg_client/client.py b/meorg_client/client.py index 70834fd..85a7687 100644 --- a/meorg_client/client.py +++ b/meorg_client/client.py @@ -10,9 +10,9 @@ import meorg_client.endpoints as endpoints import meorg_client.exceptions as mx import meorg_client.utilities as mu +import meorg_client.parallel as meop import mimetypes as mt from pathlib import Path -from multiprocessing import Pool class Client: @@ -217,7 +217,9 @@ def logout(self): self.headers.pop("X-User-Id", None) self.headers.pop("X-Auth-Token", None) - def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2): + def upload_files_parallel( + self, files: Union[str, Path, list], n: int = 2, attach_to: str = None + ): """Upload files in parallel. Parameters @@ -226,6 +228,8 @@ def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2): A path to a file, or a list of paths. n : int, optional Number of threads to use, by default 2. + attach_to : str, optional + Module output id to attach to, by default None. Returns ------- @@ -242,14 +246,16 @@ def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2): # Do the parallel upload responses = None - with Pool(processes=n) as pool: - responses = pool.map(self.upload_files, files) + responses = meop.parallelise( + self.upload_files, n, files=files, attach_to=attach_to + ) return responses def upload_files( self, files: Union[str, Path], + attach_to: str = None, ) -> Union[dict, requests.Response]: """Upload a file. @@ -257,6 +263,8 @@ def upload_files( ---------- files : path-like, list Path to the file, or a list containing paths. + attach_to : str, optional + Optional model_output_id to attach the files to, by default None Returns ------- @@ -309,6 +317,13 @@ def upload_files( for fd in payload: fd[1][1].close() + # Automatically attach to a model output + if attach_to: + + _ = self.attach_files_to_model_output( + attach_to, files=mu.get_uploaded_file_ids(response) + ) + return response def list_files(self, id: str) -> Union[dict, requests.Response]: diff --git a/meorg_client/parallel.py b/meorg_client/parallel.py new file mode 100644 index 0000000..1962e2d --- /dev/null +++ b/meorg_client/parallel.py @@ -0,0 +1,66 @@ +"""Methods for parallel execution.""" + +import pandas as pd +import multiprocessing as mp + + +def _execute(mp_args: tuple): + """Execute an instance of the parallel function. + + Parameters + ---------- + mp_args : tuple + 2-tuple consisting of a callable and an arguments dictionary. + + Returns + ------- + mixed + Returning value of the callable. + """ + return mp_args[0](**mp_args[1]) + + +def _convert_kwargs(**kwargs): + """Convert a dict of lists and scalars into even lists for parallel execution. + + Returns + ------- + dict + A dictionary of lists of arguments. + """ + return pd.DataFrame(kwargs).to_dict("records") + + +def parallelise(func: callable, num_threads: int, **kwargs): + """Execute `func` in parallel over `num_threads`. + + Parameters + ---------- + func : callable + Function to parallelise. + num_threads : int + Number of threads. + **kwargs : + Keyword arguments for `func` all lists must have equal length, scalars will be converted to lists. + + Returns + ------- + mixed + Returning value of `func`. + """ + + # Convert the kwargs to argument list of dicts + mp_args = _convert_kwargs(**kwargs) + + # Attach the function pointer as the first argument + mp_args = [[func, mp_arg] for mp_arg in mp_args] + + # Start with empty results + results = None + + # Establish a pool of workers (blocking) + with mp.Pool(processes=num_threads) as pool: + results = pool.map(_execute, mp_args) + + # Return the results + return results diff --git a/meorg_client/tests/test_cli.py b/meorg_client/tests/test_cli.py index fe2fb2d..35423c0 100644 --- a/meorg_client/tests/test_cli.py +++ b/meorg_client/tests/test_cli.py @@ -89,3 +89,22 @@ def test_file_attach(runner): ) assert result.exit_code == 0 + + +def test_file_upload_with_attach(runner, test_filepath): + """Test file upload with attachment via CLI.""" + model_output_id = store.get("model_output_id") + result = runner.invoke( + cli.file_upload, [test_filepath, test_filepath, "--attach_to", model_output_id] + ) + assert result.exit_code == 0 + + +def test_file_upload_parallel_with_attach(runner, test_filepath): + """Test file upload with attachment via CLI.""" + model_output_id = store.get("model_output_id") + result = runner.invoke( + cli.file_upload_parallel, + [test_filepath, test_filepath, "--attach_to", model_output_id], + ) + assert result.exit_code == 0 diff --git a/meorg_client/tests/test_client.py b/meorg_client/tests/test_client.py index 6146680..b2a964a 100644 --- a/meorg_client/tests/test_client.py +++ b/meorg_client/tests/test_client.py @@ -40,6 +40,10 @@ def _get_authenticated_client() -> Client: return client +def _get_test_file(): + return os.path.join(mu.get_installed_data_root(), "test/test.txt") + + @pytest.fixture def client() -> Client: return _get_authenticated_client() @@ -151,6 +155,13 @@ def test_upload_file_large(client: Client): assert client.success() +def test_upload_files_with_attach(client: Client): + """Test that the upload can also attach in the same method call.""" + filepath = _get_test_file() + _ = client.upload_files([filepath, filepath], attach_to=client._model_output_id) + assert client.success() + + def test_upload_file_parallel(client: Client, test_filepath: str): """Test the uploading of a file.""" # Upload the file @@ -162,6 +173,19 @@ def test_upload_file_parallel(client: Client, test_filepath: str): ) +def test_upload_file_parallel_with_attach(client: Client, test_filepath: str): + """Test the uploading of a file with a model output ID to attach.""" + # Upload the file + responses = client.upload_files_parallel( + [test_filepath, test_filepath], n=2, attach_to=client._model_output_id + ) + + # Make sure it worked + assert all( + [response.get("data").get("files")[0].get("file") for response in responses] + ) + + def test_logout(client: Client): """Test logout.""" client.logout() diff --git a/meorg_client/utilities.py b/meorg_client/utilities.py index 9bd1336..302803d 100644 --- a/meorg_client/utilities.py +++ b/meorg_client/utilities.py @@ -84,3 +84,20 @@ def ensure_list(obj): The object as a list, if it is not already. """ return obj if isinstance(obj, list) else [obj] + + +def get_uploaded_file_ids(response): + """Get the file ids out of the response object. + + Parameters + ---------- + response : dict + Response dictionary from a upload call. + + Returns + ------- + list + List of file ids. + """ + file_ids = [f.get("file") for f in response.get("data").get("files")] + return file_ids diff --git a/pyproject.toml b/pyproject.toml index 9678059..d6fc847 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,8 @@ dependencies = [ "requests>=2.31.0", "requests-mock>=1.11.0", "PyYAML>=6.0.1", - "click>=8.1.7" + "click>=8.1.7", + "pandas>=2.2.2" ] authors = [