diff --git a/kernelci/api/__init__.py b/kernelci/api/__init__.py index 0ea21d5795..6402d75f07 100644 --- a/kernelci/api/__init__.py +++ b/kernelci/api/__init__.py @@ -83,6 +83,14 @@ def send_event(self, channel: str, data): def receive_event(self, sub_id: int) -> CloudEvent: """Listen and receive an event from a given subscription id""" + @abc.abstractmethod + def push_event(self, channel: str, data): + """Push an event to a given Redis List""" + + @abc.abstractmethod + def pop_event(self, sub_id: int) -> CloudEvent: + """Listen and pop an event from a given List of a subscription""" + # ----- # Nodes # ----- diff --git a/kernelci/api/helper.py b/kernelci/api/helper.py index db1f94d42a..810191ee24 100644 --- a/kernelci/api/helper.py +++ b/kernelci/api/helper.py @@ -43,6 +43,10 @@ def receive_event_data(self, sub_id): """Receive CloudEvent from Pub/Sub and return its data payload""" return self.api.receive_event(sub_id).data + def pop_event_data(self, sub_id): + """Receive CloudEvent from Redis list and return its data payload""" + return self.api.pop_event(sub_id).data + def get_node_from_event(self, event_data): """Listen for an event and get the matching node object from it""" return self.api.get_node(event_data['id']) diff --git a/kernelci/api/latest.py b/kernelci/api/latest.py index 1bce32da44..2d4d0bb649 100644 --- a/kernelci/api/latest.py +++ b/kernelci/api/latest.py @@ -6,6 +6,7 @@ """KernelCI API bindings for the latest version""" import enum +import json from typing import Optional, Sequence from cloudevents.http import from_json @@ -72,6 +73,21 @@ def receive_event(self, sub_id: int): continue return event + def push_event(self, channel: str, data): + self._post('/'.join(['push', channel]), data) + + def pop_event(self, sub_id: int): + path = '/'.join(['pop', str(sub_id)]) + while True: + resp = self._get(path) + data = json.dumps(resp.json()) + if not data: + continue + event = from_json(data) + if event.data == 'BEEP': + continue + return event + def _get_api_objs(self, params: dict, path: str, limit: Optional[int] = None, offset: Optional[int] = None) -> list: diff --git a/kernelci/cli/event.py b/kernelci/cli/event.py index 820f4bde41..46269253fd 100644 --- a/kernelci/cli/event.py +++ b/kernelci/cli/event.py @@ -81,3 +81,40 @@ def receive(config, api, indent, sub_id, secrets): click.echo(json.dumps(event, indent=indent)) else: click.echo(event) + + +@kci_event.command(secrets=True) +@click.option('--is-json', help="Parse input data as JSON", is_flag=True) +@Args.config +@Args.api +@click.argument('channel') +def push(config, api, is_json, channel, secrets): + """Read some data on stdin and push it as an event on a list""" + configs = kernelci.config.load(config) + api_config = configs['api'][api] + api = kernelci.api.get_api(api_config, secrets.api.token) + data = sys.stdin.read() + if is_json: + data = json.loads(data) + api.push_event(channel, {'data': data}) + + +@kci_event.command(secrets=True) +@click.argument('sub_id') +@Args.config +@Args.api +@Args.indent +def pop(config, api, indent, sub_id, secrets): + """Wait and pop an event from a List of a subscription and print on + stdout""" + configs = kernelci.config.load(config) + api_config = configs['api'][api] + api = kernelci.api.get_api(api_config, secrets.api.token) + helper = kernelci.api.helper.APIHelper(api) + event = helper.pop_event_data(sub_id) + if isinstance(event, str): + click.echo(event.strip()) + elif isinstance(event, dict): + click.echo(json.dumps(event, indent=indent)) + else: + click.echo(event)