From ff5cba79f448f74002e659a4a91b2f2224f5b7c4 Mon Sep 17 00:00:00 2001 From: Ben Schroeter Date: Thu, 8 Aug 2024 12:52:05 +1000 Subject: [PATCH 1/2] Parallel file upload implementation. Fixes #42 --- meorg_client/cli.py | 14 ++++++++++++++ meorg_client/client.py | 31 +++++++++++++++++++++++++++++++ meorg_client/tests/test_cli.py | 9 +++++++++ meorg_client/tests/test_client.py | 14 ++++++++++++++ 4 files changed, 68 insertions(+) diff --git a/meorg_client/cli.py b/meorg_client/cli.py index aff5fcc..e74cd89 100644 --- a/meorg_client/cli.py +++ b/meorg_client/cli.py @@ -124,6 +124,19 @@ def file_upload(file_path): click.echo(f.get("file")) +@click.command("upload_parallel") +@click.argument("file_paths", nargs=-1) +@click.option( + "-n", default=2, help="Number of simultaneous parallel uploads (default=2)." +) +def file_upload_parallel(file_paths, n=2): + client = _get_client() + responses = _call(client.upload_files_parallel, files=list(file_paths), n=n) + print(responses) + for response in responses: + click.echo(response.get("data").get("files")[0].get("file")) + + @click.command("list") @click.argument("id") def file_list(id): @@ -250,6 +263,7 @@ def cli_analysis(): # Add file commands cli_file.add_command(file_list) cli_file.add_command(file_upload) +cli_file.add_command(file_upload_parallel) cli_file.add_command(file_attach) # Add endpoint commands diff --git a/meorg_client/client.py b/meorg_client/client.py index 573d45e..45a7ee8 100644 --- a/meorg_client/client.py +++ b/meorg_client/client.py @@ -12,6 +12,7 @@ import meorg_client.utilities as mu import mimetypes as mt from pathlib import Path +from multiprocessing import Pool class Client: @@ -216,6 +217,36 @@ def logout(self): self.headers.pop("X-User-Id", None) self.headers.pop("X-Auth-Token", None) + def upload_files_parallel(self, files: list, n: int = 2): + """Upload files in parallel. + + Parameters + ---------- + files : list + List of file paths. + n : int, optional + Number of threads to use, by default 2 + + Returns + ------- + list + List of dicts or response objects from upload_files. + """ + + # Ensure the object is actually iterable + files = mu.ensure_list(files) + + # Sequential case, single file provided + if len(files) == 1: + return self.upload_files(files) + + # Do the parallel upload + responses = None + with Pool(processes=n) as pool: + responses = pool.map(self.upload_files, files) + + return responses + def upload_files( self, files: Union[str, Path], diff --git a/meorg_client/tests/test_cli.py b/meorg_client/tests/test_cli.py index bd7edc0..d8f604c 100644 --- a/meorg_client/tests/test_cli.py +++ b/meorg_client/tests/test_cli.py @@ -48,6 +48,15 @@ def test_file_multiple(runner): time.sleep(5) +def test_file_upload_parallel(runner): + """Test file-upload via CLI.""" + + # Upload a tiny test file + filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") + result = runner.invoke(cli.file_upload_parallel, [filepath, filepath, "-n 2"]) + assert result.exit_code == 0 + + def test_file_list(runner): """Test file-list via CLI.""" result = runner.invoke(cli.file_list, [store.get("model_output_id")]) diff --git a/meorg_client/tests/test_client.py b/meorg_client/tests/test_client.py index e412d73..1ab5ce0 100644 --- a/meorg_client/tests/test_client.py +++ b/meorg_client/tests/test_client.py @@ -144,6 +144,20 @@ def test_upload_file_large(client): assert client.success() +def test_upload_file_parallel(client): + """Test the uploading of a file.""" + # Upload the file. + filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") + + # Upload the file + responses = client.upload_files_parallel([filepath, filepath], n=2) + + # Make sure it worked + assert all( + [response.get("data").get("files")[0].get("file") for response in responses] + ) + + def test_logout(client): """Test logout.""" client.logout() From fc6ec579cd76d14f0f6ff9db357d210882375f95 Mon Sep 17 00:00:00 2001 From: Ben Schroeter Date: Mon, 12 Aug 2024 12:14:48 +1000 Subject: [PATCH 2/2] Review changes. Fixes #42. --- meorg_client/cli.py | 30 ++++++++++++------- meorg_client/client.py | 14 ++++----- meorg_client/tests/test_cli.py | 40 ++++++++++++++++++------- meorg_client/tests/test_client.py | 50 +++++++++++++++++-------------- 4 files changed, 82 insertions(+), 52 deletions(-) diff --git a/meorg_client/cli.py b/meorg_client/cli.py index e74cd89..ba74525 100644 --- a/meorg_client/cli.py +++ b/meorg_client/cli.py @@ -11,7 +11,7 @@ import json -def _get_client(): +def _get_client() -> Client: """Get an authenticated client. Returns @@ -45,7 +45,7 @@ def _get_client(): ) -def _call(func, **kwargs): +def _call(func: callable, **kwargs) -> dict: """Simple wrapper to handle exceptions. Exceptions are captured broadly and raw error message printed before non-zero exit. @@ -109,7 +109,7 @@ def list_endpoints(): @click.command("upload") @click.argument("file_path", nargs=-1) -def file_upload(file_path): +def file_upload(file_path: tuple): """ Upload a file to the server. @@ -129,17 +129,25 @@ def file_upload(file_path): @click.option( "-n", default=2, help="Number of simultaneous parallel uploads (default=2)." ) -def file_upload_parallel(file_paths, n=2): +def file_upload_parallel(file_paths: tuple, n: int = 2): + """Upload files in parallel. + + Parameters + ---------- + file_paths : tuple + Sequence of file paths. + n : int, optional + Number of parallel uploads, by default 2 + """ client = _get_client() responses = _call(client.upload_files_parallel, files=list(file_paths), n=n) - print(responses) for response in responses: click.echo(response.get("data").get("files")[0].get("file")) @click.command("list") @click.argument("id") -def file_list(id): +def file_list(id: str): """ List the files currently attached to a model output. @@ -155,7 +163,7 @@ def file_list(id): @click.command("attach") @click.argument("file_id") @click.argument("output_id") -def file_attach(file_id, output_id): +def file_attach(file_id: str, output_id: str): """ Attach a file to a model output. """ @@ -168,7 +176,7 @@ def file_attach(file_id, output_id): @click.command("start") @click.argument("id") -def analysis_start(id): +def analysis_start(id: str): """ Start the analysis for the model output id. @@ -185,7 +193,7 @@ def analysis_start(id): @click.command("status") @click.argument("id") -def analysis_status(id): +def analysis_status(id: str): """ Get the status of the analysis. @@ -212,7 +220,7 @@ def analysis_status(id): @click.option( "--dev", is_flag=True, default=False, help="Setup for the development server." ) -def initialise(dev=False): +def initialise(dev: bool = False): """ Initialise the client on the system. """ @@ -229,7 +237,7 @@ def initialise(dev=False): click.echo(ex.msg, err=True) sys.exit(1) - print("Connection established.") + click.echo("Connection established.") # Build out the dictionary and save it to the user home. credentials = dict(email=email, password=password) diff --git a/meorg_client/client.py b/meorg_client/client.py index 45a7ee8..70834fd 100644 --- a/meorg_client/client.py +++ b/meorg_client/client.py @@ -128,7 +128,7 @@ def _make_request( # For flexibility return self.last_response - def _get_url(self, endpoint, **kwargs): + def _get_url(self, endpoint: str, **kwargs): """Get the well-formed URL for the call. Parameters @@ -217,15 +217,15 @@ def logout(self): self.headers.pop("X-User-Id", None) self.headers.pop("X-Auth-Token", None) - def upload_files_parallel(self, files: list, n: int = 2): + def upload_files_parallel(self, files: Union[str, Path, list], n: int = 2): """Upload files in parallel. Parameters ---------- - files : list - List of file paths. + files : Union[str, Path, list] + A path to a file, or a list of paths. n : int, optional - Number of threads to use, by default 2 + Number of threads to use, by default 2. Returns ------- @@ -236,7 +236,7 @@ def upload_files_parallel(self, files: list, n: int = 2): # Ensure the object is actually iterable files = mu.ensure_list(files) - # Sequential case, single file provided + # Single file provided, don't bother starting the pool if len(files) == 1: return self.upload_files(files) @@ -410,7 +410,7 @@ def list_endpoints(self) -> Union[dict, requests.Response]: """ return self._make_request(method=mcc.HTTP_GET, endpoint=endpoints.ENDPOINT_LIST) - def success(self): + def success(self) -> bool: """Test if the last request was successful. Returns diff --git a/meorg_client/tests/test_cli.py b/meorg_client/tests/test_cli.py index d8f604c..fe2fb2d 100644 --- a/meorg_client/tests/test_cli.py +++ b/meorg_client/tests/test_cli.py @@ -8,22 +8,40 @@ @pytest.fixture -def runner(): +def runner() -> CliRunner: + """Get a runner object. + + Returns + ------- + click.testing.CliRunner + Runner object. + """ return CliRunner() -def test_list_endpoints(runner): +@pytest.fixture +def test_filepath() -> str: + """Get a test filepath from the installation. + + Returns + ------- + str + Path to the test filepath. + """ + return os.path.join(mu.get_installed_data_root(), "test/test.txt") + + +def test_list_endpoints(runner: CliRunner): """Test list-endpoints via CLI.""" result = runner.invoke(cli.list_endpoints) assert result.exit_code == 0 -def test_file_upload(runner): +def test_file_upload(runner: CliRunner, test_filepath: str): """Test file-upload via CLI.""" # Upload a tiny test file - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") - result = runner.invoke(cli.file_upload, [filepath]) + result = runner.invoke(cli.file_upload, [test_filepath]) assert result.exit_code == 0 # Add the job_id to the store for the next test @@ -33,12 +51,11 @@ def test_file_upload(runner): time.sleep(5) -def test_file_multiple(runner): +def test_file_multiple(runner: CliRunner, test_filepath: str): """Test file-upload via CLI.""" # Upload a tiny test file - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") - result = runner.invoke(cli.file_upload, [filepath, filepath]) + result = runner.invoke(cli.file_upload, [test_filepath, test_filepath]) assert result.exit_code == 0 # Add the job_id to the store for the next test @@ -48,12 +65,13 @@ def test_file_multiple(runner): time.sleep(5) -def test_file_upload_parallel(runner): +def test_file_upload_parallel(runner: CliRunner, test_filepath: str): """Test file-upload via CLI.""" # Upload a tiny test file - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") - result = runner.invoke(cli.file_upload_parallel, [filepath, filepath, "-n 2"]) + result = runner.invoke( + cli.file_upload_parallel, [test_filepath, test_filepath, "-n 2"] + ) assert result.exit_code == 0 diff --git a/meorg_client/tests/test_client.py b/meorg_client/tests/test_client.py index 1ab5ce0..6146680 100644 --- a/meorg_client/tests/test_client.py +++ b/meorg_client/tests/test_client.py @@ -8,7 +8,7 @@ import tempfile as tf -def _get_authenticated_client(): +def _get_authenticated_client() -> Client: """Get an authenticated client for tests. Returns @@ -41,30 +41,39 @@ def _get_authenticated_client(): @pytest.fixture -def client(): +def client() -> Client: return _get_authenticated_client() +@pytest.fixture +def test_filepath() -> str: + """Get a test filepath from the installation. + + Returns + ------- + str + Path to the test filepath. + """ + return os.path.join(mu.get_installed_data_root(), "test/test.txt") + + def test_login(): """Test login.""" _client = _get_authenticated_client() assert "X-Auth-Token" in _client.headers.keys() -def test_list_endpoints(client): +def test_list_endpoints(client: Client): """Test list_endpoints.""" response = client.list_endpoints() assert client.success() assert isinstance(response, dict) -def test_upload_file(client): +def test_upload_file(client: Client, test_filepath: str): """Test the uploading of a file.""" - # Upload the file. - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") - # Upload the file - response = client.upload_files(filepath) + response = client.upload_files(test_filepath) # Make sure it worked assert client.success() @@ -73,13 +82,11 @@ def test_upload_file(client): store.set("file_upload", response) -def test_upload_file_multiple(client): +def test_upload_file_multiple(client: Client, test_filepath: str): """Test the uploading of a file.""" - # Upload the file. - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") # Upload the file - response = client.upload_files([filepath, filepath]) + response = client.upload_files([test_filepath, test_filepath]) # Make sure it worked assert client.success() @@ -88,14 +95,14 @@ def test_upload_file_multiple(client): store.set("file_upload_multiple", response) -def test_file_list(client): +def test_file_list(client: Client): """Test the listinf of files for a model output.""" response = client.list_files(client._model_output_id) assert client.success() assert isinstance(response.get("data").get("files"), list) -def test_attach_files_to_model_output(client): +def test_attach_files_to_model_output(client: Client): # Get the file id from the job id file_id = store.get("file_upload").get("data").get("files")[0].get("file") @@ -105,14 +112,14 @@ def test_attach_files_to_model_output(client): assert client.success() -def test_start_analysis(client): +def test_start_analysis(client: Client): """Test starting an analysis.""" response = client.start_analysis(client._model_output_id) assert client.success() store.set("start_analysis", response) -def test_get_analysis_status(client): +def test_get_analysis_status(client: Client): """Test getting the analysis status.""" # Get the analysis id from the store analysis_id = store.get("start_analysis").get("data").get("analysisId") @@ -121,7 +128,7 @@ def test_get_analysis_status(client): @pytest.mark.xfail(strict=False) -def test_upload_file_large(client): +def test_upload_file_large(client: Client): """Test the uploading of a large-ish file.""" # Create an in-memory 10mb file @@ -144,13 +151,10 @@ def test_upload_file_large(client): assert client.success() -def test_upload_file_parallel(client): +def test_upload_file_parallel(client: Client, test_filepath: str): """Test the uploading of a file.""" - # Upload the file. - filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt") - # Upload the file - responses = client.upload_files_parallel([filepath, filepath], n=2) + responses = client.upload_files_parallel([test_filepath, test_filepath], n=2) # Make sure it worked assert all( @@ -158,7 +162,7 @@ def test_upload_file_parallel(client): ) -def test_logout(client): +def test_logout(client: Client): """Test logout.""" client.logout() assert "X-Auth-Token" not in client.headers.keys()