Skip to content

Commit

Permalink
refactor!: use Socket.IO client instead of websockets for run-time da…
Browse files Browse the repository at this point in the history
…ta (#95)

* refactor!: use socketio client for runtime data instead of websockets

* Re-implement the read_once / read_until functions with socketio

* Add new method to wait for a component state

* delete old websocket client

* improve compatibility check and include client version metadata

* CHANGELOG
  • Loading branch information
eeberhard authored Feb 15, 2024
1 parent 9a8413f commit 069c33f
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 226 deletions.
4 changes: 4 additions & 0 deletions python/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ Release Versions:
- [1.0.1](#101)
- [1.0.0](#100)

## Upcoming changes

- refactor!: use Socket.IO client instead of websockets for run-time data (#95)

## 1.2.1

- Correct typehints for setting parameters (#96)
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ readme = "README.md"
requires-python = ">=3.7"
dependencies = [
"requests >= 2.28.1",
"websocket-client >= 1.4.2"
"python-socketio[client] >= 5.11.0"
]
classifiers = [
"Programming Language :: Python :: 3",
Expand Down
76 changes: 40 additions & 36 deletions python/src/aica_api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import requests
import yaml

from aica_api.ws_client import WebsocketSyncClient
import importlib.metadata

from aica_api.sio_client import read_until


class AICA:
Expand All @@ -29,7 +31,6 @@ def __init__(self, url: str = 'localhost', port: Union[str, int] = '5000'):
raise ValueError(f'Invalid URL format {url}')
else:
self._address = f'http://{url}:{port}'
self._ws_address = f'ws://{url}:{port}'

def _endpoint(self, endpoint=''):
"""
Expand All @@ -40,30 +41,37 @@ def _endpoint(self, endpoint=''):
"""
return f'{self._address}/v2/{endpoint}'

def _ws_endpoint(self, endpoint=''):
@staticmethod
def client_version() -> str:
"""
Build the connection address for a given websocket endpoint.
Get the version of this API client utility
:param endpoint: The websocket endpoint
:return: The constructed connection address
:return: The version of the API client
"""
return f'{self._ws_address}/{endpoint}'
return importlib.metadata.version('aica_api')

def check(self) -> bool:
"""
Check if the API version is v2 (any v2.x.x tag)
Check if this API client is compatible with the detected API server version
:return: True if the client is compatible with the API server version, False otherwise
"""
# TODO: come up with a compatibility table in the future
try:
api_version = requests.get(f'{self._address}/version').json()
except requests.exceptions.RequestException as e:
print(f'Error connecting to the API! {e}')
return False
new_version = api_version.startswith('2')
if not new_version:
print(f'The detected API version v{api_version} is older than the minimum API version v2.0.0 supported by '
f'this client')
return new_version

if api_version.startswith('3'):
return True
elif api_version.startswith('2'):
print(f'The detected API version v{api_version} is older than the minimum API version v3.0.0 supported by '
f'this client (v{self.client_version()}). Please install Python API client version v1.2.0 '
f'for API server versions v2.X.')
return False
else:
print(f'The detected API version v{api_version} is deprecated and not supported by this API client!')
return False

def component_descriptions(self) -> requests.Response:
"""
Expand Down Expand Up @@ -258,7 +266,20 @@ def get_application(self):
endpoint = "application"
return requests.get(self._endpoint(endpoint))

def wait_for_predicate(self, component, predicate, timeout: Union[None, int, float] = None):
def wait_for_component(self, component: str, state: str, timeout: Union[None, int, float] = None):
"""
Wait for a component to be in a particular state. Components can be in any of the following states:
['unloaded', 'loaded', 'unconfigured', 'inactive', 'active', 'finalized']
:param component: The name of the component
:param state: The state of the component
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: False if the connection times out before the component is in the intended state
"""
return read_until(lambda data: data[component]['state'] == state, url=self._address, namespace='/v2/components',
event='component_data', timeout=timeout)

def wait_for_predicate(self, component: str, predicate: str, timeout: Union[None, int, float] = None):
"""
Wait until a component predicate is true.
Expand All @@ -267,17 +288,8 @@ def wait_for_predicate(self, component, predicate, timeout: Union[None, int, flo
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: False if the connection times out before the predicate is true
"""
component = f'{component}'

def check_predicate(data):
try:
if data[component]["predicates"][predicate]:
return True
except KeyError:
return False

ws = WebsocketSyncClient(self._ws_endpoint('components'))
return ws.read_until(check_predicate, timeout=timeout)
return read_until(lambda data: data[component]["predicates"][predicate], url=self._address,
namespace='/v2/components', event='component_data', timeout=timeout)

def wait_for_condition(self, condition, timeout=None):
"""
Expand All @@ -287,13 +299,5 @@ def wait_for_condition(self, condition, timeout=None):
:param timeout: Timeout duration in seconds. If set to None, block indefinitely
:return: False if the connection times out before the condition is true
"""

def check_condition(data):
try:
if data[condition]:
return True
except KeyError:
return False

ws = WebsocketSyncClient(self._ws_endpoint('conditions'))
return ws.read_until(check_condition, timeout=timeout)
return read_until(lambda data: data[condition], url=self._address, namespace='/v2/conditions',
event='conditions', timeout=timeout)
66 changes: 66 additions & 0 deletions python/src/aica_api/sio_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import time
from typing import Callable, Union

import socketio
from socketio.exceptions import ConnectionError, TimeoutError


def read_once(url: str = 'http://0.0.0.0:5000', namespace: str = '/', event: str = '*',
timeout: Union[None, int, float] = 5) -> Union[None, dict]:
"""
Listen for and return the first Socket.IO event on a specified URL and namespace within a time limited period
:param url: The Socket.IO server URL
:param namespace: The Socket.IO namespace (channel)
:param event: The Socket.IO event name. By default, all events are accepted with a wildcard
:param timeout: The timeout in seconds to listen for an event. If set to None, block indefinitely
:return: The received event data, or None if the connection or event listener timed out
"""
return read_until(lambda data: True, url=url, namespace=namespace, event=event, timeout=timeout)


def read_until(callback: Callable[[dict], bool], url: str = 'http://0.0.0.0:5000', namespace: str = '/',
event: str = '*', timeout: Union[None, int, float] = 5) -> Union[None, dict]:
"""
Listen for and return the first Socket.IO event that validates against a callback function on a specified URL
and namespace within a time limited period
:param callback: A data callback function taking a single dict argument and returning true or false.
KeyErrors are automatically suppressed. For example:
def user_callback(data: dict) -> bool:
return data['foo'] == 'bar'
:param url: The Socket.IO server URL
:param namespace: The Socket.IO namespace (channel)
:param event: The Socket.IO event name. By default, all events are accepted with a wildcard
:param timeout: The timeout in seconds to listen for an event. If set to None, block indefinitely
:return: The received event data, or None if the connection or event listener timed out
"""

with socketio.SimpleClient() as sio:
try:
sio.connect(url, namespace=namespace, wait_timeout=timeout)
except ConnectionError:
print(f"Could not connect!")
return None

start_time = time.time()
while True:
try:
if timeout is None:
received = sio.receive()
else:
elapsed = time.time() - start_time
if elapsed >= timeout:
raise TimeoutError
received = sio.receive(timeout=timeout - elapsed)
except TimeoutError:
break
else:
if event == '*' or event == received[0]:
try:
data = received[1]
if callback(data):
return data
except KeyError:
# invalid key access in the callback function, ignore
pass
Loading

0 comments on commit 069c33f

Please sign in to comment.