diff --git a/python/vineyard/core/client.py b/python/vineyard/core/client.py index 2bb0b1c2f..1c1f2b166 100644 --- a/python/vineyard/core/client.py +++ b/python/vineyard/core/client.py @@ -21,6 +21,8 @@ import warnings from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed +from concurrent.futures import Future +import threading from typing import Any from typing import Dict from typing import List @@ -43,9 +45,94 @@ from vineyard._C import _connect from vineyard.core.builder import BuilderContext from vineyard.core.builder import put +from vineyard.core.resolver import get_current_resolvers from vineyard.core.resolver import ResolverContext from vineyard.core.resolver import get +class LazyObject: + """A helper class for lazy fetching of vineyard objects.""" + + def __init__(self, client, object_id: Optional[ObjectID] = None, name: Optional[str] = None, resolver: Optional[BuilderContext] = None, fetch: bool = False, **kwargs): + """ + Initialize the LazyObject. + + Args: + client (Client): The Vineyard client. + object_id (Optional[ObjectID]): The object id to fetch. + name (Optional[str]): The name of the object to fetch. + resolver (Optional[BuilderContext]): The resolver to use for fetching the object. + fetch (bool): Whether to fetch the object immediately. + """ + self.client = client + self.object_id = object_id + self.name = name + self.resolver = resolver + self.kwargs = kwargs + self._result: Optional[Any] = None + self._exception: Optional[Exception] = None + self._ready_event = threading.Event() + + # Mark the future as not started yet + self.future: Optional[Future] = None + + # Start the asynchronous fetch + self._start_fetch() + + def _start_fetch(self): + """Start the asynchronous fetch operation.""" + if self.future is None: + self.future = self.client._lazy_get_thread_pool.submit(self._fetch) + self.future.add_done_callback(self._callback) + + def _fetch(self) -> None: + """Internal method to fetch the object.""" + obj = self.client.get(object_id=self.object_id, name=self.name, resolver=self.resolver, fetch=self.fetch, **self.kwargs) + self._result = obj + + def _callback(self, fut: Future): + """Callback executed when the future completes.""" + try: + fut.result() # This will raise if the fetch failed + except Exception as e: + self._exception = e + finally: + self._ready_event.set() + + def get(self) -> Any: + """ + Retrieve the object. If the object is not ready, raise an exception. + + Returns: + Any: The fetched object. + + Raises: + Exception: If an error occurred during fetching. + """ + if self.future is None: + raise RuntimeError("Fetch operation was not started.") + + if self.future.done(): + if self._exception: + raise self._exception + return self._result + else: + self.cancel() + raise Exception("Data not ready, fetch operation has been canceled.") + + def is_ready(self) -> bool: + """ + Check if the tensor has been fetched. + + Returns: + bool: True if the tensor has been fetched, False otherwise. + """ + return self._ready_event.is_set() + + def cancel(self): + """Cancel the fetch operation""" + if self.future and not self.future.done(): + self.future.cancel() + def _apply_docstring(func): def _apply(fn): @@ -168,6 +255,7 @@ def __init__( session: int = None, username: str = None, password: str = None, + max_workers: int = 8, config: str = None, ): """Connects to the vineyard IPC socket and RPC socket. @@ -211,6 +299,8 @@ def __init__( is enabled. password: Optional, the required password of vineyardd when authentication is enabled. + max_workers: Optional, the maximum number of threads that can be used to + asynchronously get/put objects from/to vineyard. Default is 8. config: Optional, can either be a path to a YAML configuration file or a path to a directory containing the default config file `vineyard-config.yaml`. Also, the environment variable @@ -292,6 +382,12 @@ def __init__( self._spread = False self._compression = True + + # Initialize thread pool for lazy_get + self._lazy_get_thread_pool = ThreadPoolExecutor(max_workers=max_workers) + self._lazy_get_futures: Dict[Optional[ObjectID, str], Any] = {} + self._lazy_get_lock = threading.Lock() + if self._ipc_client is None and self._rpc_client is None: raise ConnectionError( "Failed to connect to vineyard via both IPC and RPC connection. " @@ -874,5 +970,36 @@ def with_spread(self, enabled: bool = True): yield self.spread = tmp_spread + def lazy_get(self, object_id: Optional[ObjectID] = None, name: Optional[str] = None, resolver: Optional[BuilderContext] = None, fetch: bool = False, **kwargs) -> LazyObject: + """Asynchronously fetch an object corresponding to the given object name. + + Args: + object_id (Optional[ObjectID]): The object id to fetch. + name (Optional[str]): The name of the object to fetch. + resolver (Optional[BuilderContext]): The resolver to use for fetching the object. + fetch (bool): Whether to fetch the object immediately. + + Returns: + LazyObject: An object that represents the future result of the fetch operation. + """ + with self._lazy_get_lock: + if object_id is not None and object_id in self._lazy_get_futures: + return self._lazy_get_futures[object_id] + if name is not None and name in self._lazy_get_futures: + return self._lazy_get_futures[name] + + resolver = resolver or get_current_resolvers() + lazy_object = LazyObject(self, object_id,, resolver, **kwargs) + self._lazy_get_futures[name] = lazy_object + + def _cleanup(fut): + with self._lazy_get_lock: + self._lazy_get_futures.pop(name, None) + + # Attach cleanup logic once the tensor is fetched or fails + if lazy_object.future: + lazy_object.future.add_done_callback(_cleanup) + + return lazy_object __all__ = ['Client'] diff --git a/python/vineyard/core/tests/test_lazy_get.py b/python/vineyard/core/tests/test_lazy_get.py new file mode 100644 index 000000000..a7fd9cdb1 --- /dev/null +++ b/python/vineyard/core/tests/test_lazy_get.py @@ -0,0 +1,48 @@ +import vineyard +import torch +from vineyard.contrib.ml.torch import torch_context + +client = vineyard.connect('/var/run/vineyard.sock1') + +#create a random tensor +#x = torch.rand(100, 100, 100) +#client.delete(name="test_lazy_tensor") +#with torch_context(client): +# client.put(x, name="test_lazy_tensor", persist=True) + +test_kwargs = {'test': 'test'} +with torch_context(client): + lazy_tensor = client.lazy_get(name="test_lazy_tensor111", **test_kwargs) +""" +try: + for i in range(8): + name = "test_lazy_tensor" + str(i) + print("Creating tensor", name) + lazy_tensor = client.lazy_get("test_lazy_tensor") +except Exception as e: + print(e) +""" + +# At a later point, attempt to access the tensor + +try: + if lazy_tensor.is_ready(): + print("Tensor is ready") + #try: + # tensor = lazy_tensor.get() + # print(tensor) + #except RuntimeError as e: + # pass + else: + print("Tensor is not ready") + #import time + #time.sleep(5) + # print("Waiting for 5 seconds") + #if lazy_tensor.is_ready(): + # print("Tensor is ready") + # tensor = lazy_tensor.get() + # print(tensor) +except Exception as e: + print(e) + +print("Done") \ No newline at end of file