Skip to content

Commit

Permalink
Parallel file upload implementation. Fixes #42
Browse files Browse the repository at this point in the history
  • Loading branch information
bschroeter committed Aug 8, 2024
1 parent 4b36be0 commit ff5cba7
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
14 changes: 14 additions & 0 deletions meorg_client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ def file_upload(file_path):
click.echo(f.get("file"))


@click.command("upload_parallel")
@click.argument("file_paths", nargs=-1)
@click.option(
"-n", default=2, help="Number of simultaneous parallel uploads (default=2)."
)
def file_upload_parallel(file_paths, n=2):
client = _get_client()
responses = _call(client.upload_files_parallel, files=list(file_paths), n=n)
print(responses)
for response in responses:
click.echo(response.get("data").get("files")[0].get("file"))


@click.command("list")
@click.argument("id")
def file_list(id):
Expand Down Expand Up @@ -250,6 +263,7 @@ def cli_analysis():
# Add file commands
cli_file.add_command(file_list)
cli_file.add_command(file_upload)
cli_file.add_command(file_upload_parallel)
cli_file.add_command(file_attach)

# Add endpoint commands
Expand Down
31 changes: 31 additions & 0 deletions meorg_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import meorg_client.utilities as mu
import mimetypes as mt
from pathlib import Path
from multiprocessing import Pool


class Client:
Expand Down Expand Up @@ -216,6 +217,36 @@ def logout(self):
self.headers.pop("X-User-Id", None)
self.headers.pop("X-Auth-Token", None)

def upload_files_parallel(self, files: list, n: int = 2):
"""Upload files in parallel.
Parameters
----------
files : list
List of file paths.
n : int, optional
Number of threads to use, by default 2
Returns
-------
list
List of dicts or response objects from upload_files.
"""

# Ensure the object is actually iterable
files = mu.ensure_list(files)

# Sequential case, single file provided
if len(files) == 1:
return self.upload_files(files)

# Do the parallel upload
responses = None
with Pool(processes=n) as pool:
responses = pool.map(self.upload_files, files)

return responses

def upload_files(
self,
files: Union[str, Path],
Expand Down
9 changes: 9 additions & 0 deletions meorg_client/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ def test_file_multiple(runner):
time.sleep(5)


def test_file_upload_parallel(runner):
"""Test file-upload via CLI."""

# Upload a tiny test file
filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt")
result = runner.invoke(cli.file_upload_parallel, [filepath, filepath, "-n 2"])
assert result.exit_code == 0


def test_file_list(runner):
"""Test file-list via CLI."""
result = runner.invoke(cli.file_list, [store.get("model_output_id")])
Expand Down
14 changes: 14 additions & 0 deletions meorg_client/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ def test_upload_file_large(client):
assert client.success()


def test_upload_file_parallel(client):
"""Test the uploading of a file."""
# Upload the file.
filepath = os.path.join(mu.get_installed_data_root(), "test/test.txt")

# Upload the file
responses = client.upload_files_parallel([filepath, filepath], n=2)

# Make sure it worked
assert all(
[response.get("data").get("files")[0].get("file") for response in responses]
)


def test_logout(client):
"""Test logout."""
client.logout()
Expand Down

0 comments on commit ff5cba7

Please sign in to comment.