Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconciled upload methods and fixed tests. Fixes #50 #53

Merged
merged 1 commit into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 16 additions & 33 deletions meorg_client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,13 @@ def list_endpoints():

@click.command("upload")
@click.argument("file_path", nargs=-1)
@click.option("-n", default=1, help="Number of threads for parallel uploads.")
@click.option(
"--attach_to",
default=None,
help="Supply a model output id to immediately attach the file to.",
)
def file_upload(file_path, attach_to=None):
def file_upload(file_path, n: int = 1, attach_to=None):
"""
Upload a file to the server.

Expand All @@ -125,41 +126,23 @@ def file_upload(file_path, attach_to=None):
client = _get_client()

# Upload the file, get the job ID
response = _call(client.upload_files, files=list(file_path), attach_to=attach_to)
responses = _call(
client.upload_files,
files=list(file_path),
n=n,
attach_to=attach_to,
progress=True,
)

for response in responses:

# For singular case
if n == 1:
response = response[0]

# Different logic if we are attaching to a model output immediately
if not attach_to:
files = response.get("data").get("files")
for f in files:
click.echo(f.get("file"))
else:
click.echo("SUCCESS")


@click.command("upload_parallel")
@click.argument("file_paths", nargs=-1)
@click.option(
"-n", default=2, help="Number of simultaneous parallel uploads (default=2)."
)
@click.option(
"--attach_to",
default=None,
help="Supply a model output id to immediately attach the file to.",
)
def file_upload_parallel(file_paths: tuple, n: int = 2, attach_to: str = None):
"""Upload files in parallel.

Parameters
----------
file_paths : tuple
Sequence of file paths.
n : int, optional
Number of parallel uploads, by default 2
"""
client = _get_client()
responses = _call(client.upload_files_parallel, files=list(file_paths), n=n)
for response in responses:
click.echo(response.get("data").get("files")[0].get("file"))


@click.command("list")
Expand Down Expand Up @@ -288,7 +271,7 @@ def cli_analysis():
# Add file commands
cli_file.add_command(file_list)
cli_file.add_command(file_upload)
cli_file.add_command(file_upload_parallel)
# cli_file.add_command(file_upload_parallel)
cli_file.add_command(file_attach)

# Add endpoint commands
Expand Down
82 changes: 54 additions & 28 deletions meorg_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import meorg_client.parallel as meop
import mimetypes as mt
from pathlib import Path
from tqdm import tqdm


class Client:
Expand Down Expand Up @@ -217,8 +218,12 @@ def logout(self):
self.headers.pop("X-User-Id", None)
self.headers.pop("X-Auth-Token", None)

def upload_files_parallel(
self, files: Union[str, Path, list], n: int = 2, attach_to: str = None
def _upload_files_parallel(
self,
files: Union[str, Path, list],
n: int = 2,
attach_to: str = None,
progress=True,
):
"""Upload files in parallel.

Expand All @@ -240,21 +245,61 @@ def upload_files_parallel(
# Ensure the object is actually iterable
files = mu.ensure_list(files)

# Single file provided, don't bother starting the pool
if len(files) == 1:
return self.upload_files(files)

# Do the parallel upload
responses = None
responses = meop.parallelise(
self.upload_files, n, files=files, attach_to=attach_to
self._upload_file, n, files=files, attach_to=attach_to, progress=progress
)

return responses

def upload_files(
self,
files: Union[str, Path],
files: Union[str, Path, list],
n: int = 1,
attach_to: str = None,
progress=True,
) -> list:
"""Upload files.

Parameters
----------
files : Union[str, Path, list]
A filepath, or a list of filepaths.
n : int, optional
Number of threads to parallelise over, by default 1
attach_to : str, optional
Model output ID to immediately attach to, by default None

Returns
-------
list
List of dicts
"""

# Ensure the files are actually a list
files = mu.ensure_list(files)

# Just because someone will try to assign 0 threads...
if n >= 1 == False:
raise ValueError("Number of threads must be greater than or equal to 1.")

# Sequential upload
responses = list()
if n == 1:
for fp in tqdm(files, total=len(files)):
response = self._upload_file(fp, attach_to=attach_to)
responses.append(response)
else:
responses = self._upload_files_parallel(
files, n=n, attach_to=attach_to, progress=progress
)

return mu.ensure_list(responses)

def _upload_file(
self,
files: Union[str, Path, list],
attach_to: str = None,
) -> Union[dict, requests.Response]:
"""Upload a file.
Expand Down Expand Up @@ -324,7 +369,7 @@ def upload_files(
attach_to, files=mu.get_uploaded_file_ids(response)
)

return response
return mu.ensure_list(response)

def list_files(self, id: str) -> Union[dict, requests.Response]:
"""Get a list of model outputs.
Expand Down Expand Up @@ -434,22 +479,3 @@ def success(self) -> bool:
True if successful, False otherwise.
"""
return self.last_response.status_code in mcc.HTTP_STATUS_SUCCESS_RANGE

def is_initialised(self, dev=False) -> bool:
"""Check if the client is initialised.

NOTE: This does not check the login actually works.

Parameters
----------
dev : bool, optional
Use dev credentials, by default False

Returns
-------
bool
True if initialised, False otherwise.
"""
cred_filename = "credentials.json" if not dev else "credentials-dev.json"
cred_filepath = mu.get_user_data_filepath(cred_filename)
return cred_filepath.exists()
18 changes: 15 additions & 3 deletions meorg_client/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pandas as pd
import multiprocessing as mp
from tqdm import tqdm


def _execute(mp_args: tuple):
Expand Down Expand Up @@ -31,7 +32,7 @@ def _convert_kwargs(**kwargs):
return pd.DataFrame(kwargs).to_dict("records")


def parallelise(func: callable, num_threads: int, **kwargs):
def parallelise(func: callable, num_threads: int, progress=True, **kwargs):
"""Execute `func` in parallel over `num_threads`.

Parameters
Expand All @@ -56,11 +57,22 @@ def parallelise(func: callable, num_threads: int, **kwargs):
mp_args = [[func, mp_arg] for mp_arg in mp_args]

# Start with empty results
results = None
results = list()

# Establish a pool of workers (blocking)
with mp.Pool(processes=num_threads) as pool:
results = pool.map(_execute, mp_args)

if progress:

with tqdm(total=len(mp_args)) as pbar:
for result in pool.map(_execute, mp_args):
results.append(result[0])
pbar.update()

else:

for result in pool.map(_execute, mp_args):
results.append(result[0])

# Return the results
return results
24 changes: 11 additions & 13 deletions meorg_client/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_file_upload(runner: CliRunner, test_filepath: str):
assert result.exit_code == 0

# Add the job_id to the store for the next test
store.set("file_id", result.output.strip())
store.set("file_id", result.stdout.split()[-1].strip())

# Let it wait for a short while, allow the server to transfer to object store.
time.sleep(5)
Expand All @@ -65,16 +65,6 @@ def test_file_multiple(runner: CliRunner, test_filepath: str):
time.sleep(5)


def test_file_upload_parallel(runner: CliRunner, test_filepath: str):
"""Test file-upload via CLI."""

# Upload a tiny test file
result = runner.invoke(
cli.file_upload_parallel, [test_filepath, test_filepath, "-n 2"]
)
assert result.exit_code == 0


def test_file_list(runner):
"""Test file-list via CLI."""
result = runner.invoke(cli.file_list, [store.get("model_output_id")])
Expand All @@ -100,11 +90,19 @@ def test_file_upload_with_attach(runner, test_filepath):
assert result.exit_code == 0


def test_file_upload_parallel(runner: CliRunner, test_filepath: str):
"""Test file-upload via CLI."""

# Upload a tiny test file
result = runner.invoke(cli.file_upload, [test_filepath, test_filepath, "-n", "2"])
assert result.exit_code == 0


def test_file_upload_parallel_with_attach(runner, test_filepath):
"""Test file upload with attachment via CLI."""
model_output_id = store.get("model_output_id")
result = runner.invoke(
cli.file_upload_parallel,
[test_filepath, test_filepath, "--attach_to", model_output_id],
cli.file_upload,
[test_filepath, test_filepath, "-n", "2", "--attach_to", model_output_id],
)
assert result.exit_code == 0
19 changes: 15 additions & 4 deletions meorg_client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def test_list_endpoints(client: Client):
def test_upload_file(client: Client, test_filepath: str):
"""Test the uploading of a file."""
# Upload the file
response = client.upload_files(test_filepath)
response = client.upload_files(test_filepath)[0]

# Make sure it worked
assert client.success()
Expand Down Expand Up @@ -108,7 +108,7 @@ def test_file_list(client: Client):

def test_attach_files_to_model_output(client: Client):
# Get the file id from the job id
file_id = store.get("file_upload").get("data").get("files")[0].get("file")
file_id = store.get("file_upload")[0].get("data").get("files")[0].get("file")

# Attach it to the model output
_ = client.attach_files_to_model_output(client._model_output_id, [file_id])
Expand Down Expand Up @@ -165,7 +165,18 @@ def test_upload_files_with_attach(client: Client):
def test_upload_file_parallel(client: Client, test_filepath: str):
"""Test the uploading of a file."""
# Upload the file
responses = client.upload_files_parallel([test_filepath, test_filepath], n=2)
responses = client.upload_files([test_filepath, test_filepath], n=2, progress=True)

# Make sure it worked
assert all(
[response.get("data").get("files")[0].get("file") for response in responses]
)


def test_upload_file_parallel_no_progress(client: Client, test_filepath: str):
"""Test the uploading of a file."""
# Upload the file
responses = client.upload_files([test_filepath, test_filepath], n=2, progress=False)

# Make sure it worked
assert all(
Expand All @@ -176,7 +187,7 @@ def test_upload_file_parallel(client: Client, test_filepath: str):
def test_upload_file_parallel_with_attach(client: Client, test_filepath: str):
"""Test the uploading of a file with a model output ID to attach."""
# Upload the file
responses = client.upload_files_parallel(
responses = client.upload_files(
[test_filepath, test_filepath], n=2, attach_to=client._model_output_id
)

Expand Down
Loading