Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Support async put for vineyard client. #2018

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions python/vineyard/core/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ def put(
builder: Optional[BuilderContext] = None,
persist: bool = False,
name: Optional[str] = None,
as_async: bool = False,
**kwargs
):
"""Put python value to vineyard.
Expand Down Expand Up @@ -185,16 +186,22 @@ def put(
name: str, optional
If given, the name will be automatically associated with the resulted
object. Note that only take effect when the object is persisted.
as_async: bool, optional
If true, which means the object will be put to vineyard asynchronously.
Thus we need to use the passed builder.
kw:
User-specific argument that will be passed to the builder.

Returns:
ObjectID: The result object id will be returned.
"""
if builder is not None:
if builder is not None and not as_async:
return builder(client, value, **kwargs)

meta = get_current_builders().run(client, value, **kwargs)
if as_async:
meta = builder.run(client, value, **kwargs)
else:
meta = get_current_builders().run(client, value, **kwargs)

# the builders is expected to return an :class:`ObjectMeta`, or an
# :class:`Object` (in the `bytes_builder` and `memoryview` builder).
Expand Down
60 changes: 56 additions & 4 deletions python/vineyard/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from vineyard._C import VineyardException
from vineyard._C import _connect
from vineyard.core.builder import BuilderContext
from vineyard.core.builder import get_current_builders
from vineyard.core.builder import put
from vineyard.core.resolver import ResolverContext
from vineyard.core.resolver import get
Expand Down Expand Up @@ -168,6 +169,7 @@
session: int = None,
username: str = None,
password: str = None,
max_workers: int = 8,
config: str = None,
):
"""Connects to the vineyard IPC socket and RPC socket.
Expand Down Expand Up @@ -211,6 +213,8 @@
is enabled.
password: Optional, the required password of vineyardd when authentication
is enabled.
max_workers: Optional, the maximum number of threads that can be used to
asynchronously put objects to vineyard. Default is 8.
config: Optional, can either be a path to a YAML configuration file or
a path to a directory containing the default config file
`vineyard-config.yaml`. Also, the environment variable
Expand Down Expand Up @@ -290,6 +294,9 @@
except VineyardException:
continue

self._max_workers = max_workers
self._put_thread_pool = None

self._spread = False
self._compression = True
if self._ipc_client is None and self._rpc_client is None:
Expand Down Expand Up @@ -347,6 +354,13 @@
assert self._rpc_client is not None, "RPC client is not available."
return self._rpc_client

@property
def put_thread_pool(self) -> ThreadPoolExecutor:
"""Lazy initialization of the thread pool for asynchronous put."""
if self._put_thread_pool is None:
self._put_thread_pool = ThreadPoolExecutor(max_workers=self._max_workers)
return self._put_thread_pool

def has_ipc_client(self):
return self._ipc_client is not None

Expand Down Expand Up @@ -820,17 +834,17 @@
):
return get(self, object_id, name, resolver, fetch, **kwargs)

@_apply_docstring(put)
def put(
def _put_internal(

Check notice on line 837 in python/vineyard/core/client.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/core/client.py#L837

Too many positional arguments (6/5)
self,
value: Any,
builder: Optional[BuilderContext] = None,
persist: bool = False,
name: Optional[str] = None,
as_async: bool = False,
**kwargs,
):
try:
return put(self, value, builder, persist, name, **kwargs)
return put(self, value, builder, persist, name, as_async, **kwargs)
except NotEnoughMemoryException as exec:
with envvars(
{'VINEYARD_RPC_SKIP_RETRY': '1', 'VINEYARD_IPC_SKIP_RETRY': '1'}
Expand All @@ -856,7 +870,45 @@
host, port = meta[instance_id]['rpc_endpoint'].split(':')
self._rpc_client = _connect(host, port)
self.compression = previous_compression_state
return put(self, value, builder, persist, name, **kwargs)
return put(self, value, builder, persist, name, as_async, **kwargs)

@_apply_docstring(put)
def put(

Check notice on line 876 in python/vineyard/core/client.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

python/vineyard/core/client.py#L876

Too many positional arguments (6/5)
self,
value: Any,
builder: Optional[BuilderContext] = None,
persist: bool = False,
name: Optional[str] = None,
as_async: bool = False,
**kwargs,
):
if as_async:

def _default_callback(future):
try:
result = future.result()
if isinstance(result, ObjectID):
print(f"Successfully put object {result}", flush=True)
elif isinstance(result, ObjectMeta):
print(f"Successfully put object {result.id}", flush=True)
except Exception as e:
print(f"Failed to put object: {e}", flush=True)

current_builder = builder or get_current_builders()

thread_pool = self.put_thread_pool
result = thread_pool.submit(
self._put_internal,
value,
current_builder,
persist,
name,
as_async=True,
**kwargs,
)
result.add_done_callback(_default_callback)
return result
return self._put_internal(value, builder, persist, name, **kwargs)

@contextlib.contextmanager
def with_compression(self, enabled: bool = True):
Expand Down
39 changes: 39 additions & 0 deletions python/vineyard/core/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import itertools
import multiprocessing
import random
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from threading import Thread

import numpy as np

Expand Down Expand Up @@ -317,3 +319,40 @@ def test_memory_trim(vineyard_client):

# there might be some fragmentation overhead
assert parse_shared_memory_usage() <= original_memory_usage + 2 * data_kbytes


def test_async_put_and_get(vineyard_client):
data = np.ones((100, 100, 16))
object_nums = 100

def producer(vineyard_client):
start_time = time.time()
client = vineyard_client.fork()
for i in range(object_nums):
client.put(data, name="test" + str(i), as_async=True, persist=True)
client.put(data)
end_time = time.time()
print("Producer time: ", end_time - start_time)

def consumer(vineyard_client):
start_time = time.time()
client = vineyard_client.fork()
for i in range(object_nums):
object_id = client.get_name(name="test" + str(i), wait=True)
client.get(object_id)
end_time = time.time()
print("Consumer time: ", end_time - start_time)

producer_thread = Thread(target=producer, args=(vineyard_client,))
consumer_thread = Thread(target=consumer, args=(vineyard_client,))

start_time = time.time()

producer_thread.start()
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

end_time = time.time()
print("Total time: ", end_time - start_time)
Loading