diff --git a/meorg_client/cli.py b/meorg_client/cli.py index 6ae9267..6a21ea5 100644 --- a/meorg_client/cli.py +++ b/meorg_client/cli.py @@ -109,12 +109,13 @@ def list_endpoints(): @click.command("upload") @click.argument("file_path", nargs=-1) +@click.option("-n", default=1, help="Number of threads for parallel uploads.") @click.option( "--attach_to", default=None, help="Supply a model output id to immediately attach the file to.", ) -def file_upload(file_path, attach_to=None): +def file_upload(file_path, n: int = 1, attach_to=None): """ Upload a file to the server. @@ -125,41 +126,23 @@ def file_upload(file_path, attach_to=None): client = _get_client() # Upload the file, get the job ID - response = _call(client.upload_files, files=list(file_path), attach_to=attach_to) + responses = _call( + client.upload_files, + files=list(file_path), + n=n, + attach_to=attach_to, + progress=True, + ) + + for response in responses: + + # For singular case + if n == 1: + response = response[0] - # Different logic if we are attaching to a model output immediately - if not attach_to: files = response.get("data").get("files") for f in files: click.echo(f.get("file")) - else: - click.echo("SUCCESS") - - -@click.command("upload_parallel") -@click.argument("file_paths", nargs=-1) -@click.option( - "-n", default=2, help="Number of simultaneous parallel uploads (default=2)." -) -@click.option( - "--attach_to", - default=None, - help="Supply a model output id to immediately attach the file to.", -) -def file_upload_parallel(file_paths: tuple, n: int = 2, attach_to: str = None): - """Upload files in parallel. - - Parameters - ---------- - file_paths : tuple - Sequence of file paths. - n : int, optional - Number of parallel uploads, by default 2 - """ - client = _get_client() - responses = _call(client.upload_files_parallel, files=list(file_paths), n=n) - for response in responses: - click.echo(response.get("data").get("files")[0].get("file")) @click.command("list") @@ -288,7 +271,7 @@ def cli_analysis(): # Add file commands cli_file.add_command(file_list) cli_file.add_command(file_upload) -cli_file.add_command(file_upload_parallel) +# cli_file.add_command(file_upload_parallel) cli_file.add_command(file_attach) # Add endpoint commands diff --git a/meorg_client/client.py b/meorg_client/client.py index edd082a..6bc4b3a 100644 --- a/meorg_client/client.py +++ b/meorg_client/client.py @@ -13,6 +13,7 @@ import meorg_client.parallel as meop import mimetypes as mt from pathlib import Path +from tqdm import tqdm class Client: @@ -217,8 +218,12 @@ def logout(self): self.headers.pop("X-User-Id", None) self.headers.pop("X-Auth-Token", None) - def upload_files_parallel( - self, files: Union[str, Path, list], n: int = 2, attach_to: str = None + def _upload_files_parallel( + self, + files: Union[str, Path, list], + n: int = 2, + attach_to: str = None, + progress=True, ): """Upload files in parallel. @@ -240,21 +245,61 @@ def upload_files_parallel( # Ensure the object is actually iterable files = mu.ensure_list(files) - # Single file provided, don't bother starting the pool - if len(files) == 1: - return self.upload_files(files) - # Do the parallel upload responses = None responses = meop.parallelise( - self.upload_files, n, files=files, attach_to=attach_to + self._upload_file, n, files=files, attach_to=attach_to, progress=progress ) return responses def upload_files( self, - files: Union[str, Path], + files: Union[str, Path, list], + n: int = 1, + attach_to: str = None, + progress=True, + ) -> list: + """Upload files. + + Parameters + ---------- + files : Union[str, Path, list] + A filepath, or a list of filepaths. + n : int, optional + Number of threads to parallelise over, by default 1 + attach_to : str, optional + Model output ID to immediately attach to, by default None + + Returns + ------- + list + List of dicts + """ + + # Ensure the files are actually a list + files = mu.ensure_list(files) + + # Just because someone will try to assign 0 threads... + if n >= 1 == False: + raise ValueError("Number of threads must be greater than or equal to 1.") + + # Sequential upload + responses = list() + if n == 1: + for fp in tqdm(files, total=len(files)): + response = self._upload_file(fp, attach_to=attach_to) + responses.append(response) + else: + responses = self._upload_files_parallel( + files, n=n, attach_to=attach_to, progress=progress + ) + + return mu.ensure_list(responses) + + def _upload_file( + self, + files: Union[str, Path, list], attach_to: str = None, ) -> Union[dict, requests.Response]: """Upload a file. @@ -324,7 +369,7 @@ def upload_files( attach_to, files=mu.get_uploaded_file_ids(response) ) - return response + return mu.ensure_list(response) def list_files(self, id: str) -> Union[dict, requests.Response]: """Get a list of model outputs. @@ -434,22 +479,3 @@ def success(self) -> bool: True if successful, False otherwise. """ return self.last_response.status_code in mcc.HTTP_STATUS_SUCCESS_RANGE - - def is_initialised(self, dev=False) -> bool: - """Check if the client is initialised. - - NOTE: This does not check the login actually works. - - Parameters - ---------- - dev : bool, optional - Use dev credentials, by default False - - Returns - ------- - bool - True if initialised, False otherwise. - """ - cred_filename = "credentials.json" if not dev else "credentials-dev.json" - cred_filepath = mu.get_user_data_filepath(cred_filename) - return cred_filepath.exists() diff --git a/meorg_client/parallel.py b/meorg_client/parallel.py index 1962e2d..b9b62a6 100644 --- a/meorg_client/parallel.py +++ b/meorg_client/parallel.py @@ -2,6 +2,7 @@ import pandas as pd import multiprocessing as mp +from tqdm import tqdm def _execute(mp_args: tuple): @@ -31,7 +32,7 @@ def _convert_kwargs(**kwargs): return pd.DataFrame(kwargs).to_dict("records") -def parallelise(func: callable, num_threads: int, **kwargs): +def parallelise(func: callable, num_threads: int, progress=True, **kwargs): """Execute `func` in parallel over `num_threads`. Parameters @@ -56,11 +57,22 @@ def parallelise(func: callable, num_threads: int, **kwargs): mp_args = [[func, mp_arg] for mp_arg in mp_args] # Start with empty results - results = None + results = list() # Establish a pool of workers (blocking) with mp.Pool(processes=num_threads) as pool: - results = pool.map(_execute, mp_args) + + if progress: + + with tqdm(total=len(mp_args)) as pbar: + for result in pool.map(_execute, mp_args): + results.append(result[0]) + pbar.update() + + else: + + for result in pool.map(_execute, mp_args): + results.append(result[0]) # Return the results return results diff --git a/meorg_client/tests/test_cli.py b/meorg_client/tests/test_cli.py index 35423c0..8141520 100644 --- a/meorg_client/tests/test_cli.py +++ b/meorg_client/tests/test_cli.py @@ -45,7 +45,7 @@ def test_file_upload(runner: CliRunner, test_filepath: str): assert result.exit_code == 0 # Add the job_id to the store for the next test - store.set("file_id", result.output.strip()) + store.set("file_id", result.stdout.split()[-1].strip()) # Let it wait for a short while, allow the server to transfer to object store. time.sleep(5) @@ -65,16 +65,6 @@ def test_file_multiple(runner: CliRunner, test_filepath: str): time.sleep(5) -def test_file_upload_parallel(runner: CliRunner, test_filepath: str): - """Test file-upload via CLI.""" - - # Upload a tiny test file - result = runner.invoke( - cli.file_upload_parallel, [test_filepath, test_filepath, "-n 2"] - ) - assert result.exit_code == 0 - - def test_file_list(runner): """Test file-list via CLI.""" result = runner.invoke(cli.file_list, [store.get("model_output_id")]) @@ -100,11 +90,19 @@ def test_file_upload_with_attach(runner, test_filepath): assert result.exit_code == 0 +def test_file_upload_parallel(runner: CliRunner, test_filepath: str): + """Test file-upload via CLI.""" + + # Upload a tiny test file + result = runner.invoke(cli.file_upload, [test_filepath, test_filepath, "-n", "2"]) + assert result.exit_code == 0 + + def test_file_upload_parallel_with_attach(runner, test_filepath): """Test file upload with attachment via CLI.""" model_output_id = store.get("model_output_id") result = runner.invoke( - cli.file_upload_parallel, - [test_filepath, test_filepath, "--attach_to", model_output_id], + cli.file_upload, + [test_filepath, test_filepath, "-n", "2", "--attach_to", model_output_id], ) assert result.exit_code == 0 diff --git a/meorg_client/tests/test_client.py b/meorg_client/tests/test_client.py index b2a964a..6618ad1 100644 --- a/meorg_client/tests/test_client.py +++ b/meorg_client/tests/test_client.py @@ -77,7 +77,7 @@ def test_list_endpoints(client: Client): def test_upload_file(client: Client, test_filepath: str): """Test the uploading of a file.""" # Upload the file - response = client.upload_files(test_filepath) + response = client.upload_files(test_filepath)[0] # Make sure it worked assert client.success() @@ -108,7 +108,7 @@ def test_file_list(client: Client): def test_attach_files_to_model_output(client: Client): # Get the file id from the job id - file_id = store.get("file_upload").get("data").get("files")[0].get("file") + file_id = store.get("file_upload")[0].get("data").get("files")[0].get("file") # Attach it to the model output _ = client.attach_files_to_model_output(client._model_output_id, [file_id]) @@ -165,7 +165,18 @@ def test_upload_files_with_attach(client: Client): def test_upload_file_parallel(client: Client, test_filepath: str): """Test the uploading of a file.""" # Upload the file - responses = client.upload_files_parallel([test_filepath, test_filepath], n=2) + responses = client.upload_files([test_filepath, test_filepath], n=2, progress=True) + + # Make sure it worked + assert all( + [response.get("data").get("files")[0].get("file") for response in responses] + ) + + +def test_upload_file_parallel_no_progress(client: Client, test_filepath: str): + """Test the uploading of a file.""" + # Upload the file + responses = client.upload_files([test_filepath, test_filepath], n=2, progress=False) # Make sure it worked assert all( @@ -176,7 +187,7 @@ def test_upload_file_parallel(client: Client, test_filepath: str): def test_upload_file_parallel_with_attach(client: Client, test_filepath: str): """Test the uploading of a file with a model output ID to attach.""" # Upload the file - responses = client.upload_files_parallel( + responses = client.upload_files( [test_filepath, test_filepath], n=2, attach_to=client._model_output_id )