Skip to content

Commit

Permalink
Merge pull request #17 from MO-RISE/dev
Browse files Browse the repository at this point in the history
Multiple updates
  • Loading branch information
freol35241 authored Dec 14, 2023
2 parents 4b161c2 + e87a171 commit 486db54
Show file tree
Hide file tree
Showing 37 changed files with 866 additions and 85 deletions.
25 changes: 13 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ COPY ./brefv /brefv

# And all requirements
COPY requirements.txt requirements.txt
COPY keelson-interface-mcap/requirements.txt keelson-mcap-requirements.txt
COPY keelson-interface-http/requirements.txt keelson-http-requirements.txt
COPY keelson-interface-video/requirements.txt keelson-video-requirements.txt
COPY keelson-interface-lidar/requirements.txt keelson-lidar-requirements.txt
COPY keelson-interface-mockups/requirements.txt keelson-mockups-requirements.txt
COPY keelson-interface-mediamtx/requirements.txt keelson-mediamtx-requirements.txt
COPY keelson-interface-ouster/requirements.txt keelson-ouster-requirements.txt
COPY keelson-interface-opendlv/requirements.txt keelson-opendlv-requirements.txt

# And build all wheels in one go to ensure proper dependency resolution
RUN pip3 wheel\
/brefv/python\
-r requirements.txt\
-r keelson-mcap-requirements.txt\
-r keelson-http-requirements.txt\
-r keelson-video-requirements.txt\
-r keelson-lidar-requirements.txt\
-r keelson-mockups-requirements.txt\
-r keelson-mediamtx-requirements.txt\
-r keelson-ouster-requirements.txt\
-r keelson-opendlv-requirements.txt\
--wheel-dir /wheelhouse


Expand All @@ -30,10 +30,11 @@ COPY --from=wheelhouse /wheelhouse /wheelhouse
RUN pip3 install /wheelhouse/*

# Copy "binaries" to image
COPY --chmod=555 ./keelson-interface-mcap/bin/* /usr/local/bin
COPY --chmod=555 ./keelson-interface-http/bin/* /usr/local/bin
COPY --chmod=555 ./keelson-interface-video/bin/* /usr/local/bin
COPY --chmod=555 ./keelson-interface-lidar/bin/* /usr/local/bin
COPY --chmod=555 ./core/bin/* /usr/local/bin
COPY --chmod=555 ./keelson-interface-mockups/bin/* /usr/local/bin
COPY --chmod=555 ./keelson-interface-mediamtx/bin/* /usr/local/bin
COPY --chmod=555 ./keelson-interface-ouster/bin/* /usr/local/bin
COPY --chmod=555 ./keelson-interface-opendlv/bin/* /usr/local/bin

ENTRYPOINT ["/bin/bash", "-l", "-c"]

11 changes: 5 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@

**NOTE**: Work in progress...

`keelson` is a flexible, fast and resource-friendly communication backbone enabling edge-to-edge, machine-to-machine communication. It leverages [zenoh](https://github.com/eclipse-zenoh/zenoh) for message based communication (PUB/SUB and REQ/REP) and [mediamtx](https://github.com/bluenviron/mediamtx) for streaming of audio/video.
`keelson` is a flexible, fast and resource-friendly communication backbone enabling edge-to-edge, machine-to-machine communication. It leverages [zenoh](https://github.com/eclipse-zenoh/zenoh) for message based communication (PUB/SUB and REQ/REP) and adds an opinionated key-space design and message format on top.

**TODO**: Image/sketch

If you are new to zenoh, read here: https://zenoh.io/docs/overview/what-is-zenoh/

## Repository structure
The core parts of `keelson` are maintained and developed inside a monorepo (this repo) to ensure consistency and interoperability within versions during early development. At some point in the future, this monorepo may (or may not) be split into separate repositories.

Parts:

* [**Brefv**](./brefv/README.md) is the messaging protocol in use by keelson.
* [**Brefv**](./brefv/README.md) defines the key-space design and message formats used by `keelson`.
* [**Infrastructure guidelines**](./infrastructure/README.md) contains bits and pieces to set up a working zenoh network infrastructure suitable for keelson.
* [**keelson-interface-mcap**](./keelson-interface-mcap/) contains recording and replaying functionality for the mcap file format.
* [**keelson-interface-http**](./keelson-interface-http/) contains a temporary extension to the http rest api offered by zenohd.
* [**keelson-interface-video**](./keelson-interface-mcap/) contains functionality to interface with video streaming hardware and software. In particular, MediaMTX.
* [**keelson-interface-lidar**](./keelson-interface-mcap/) contains functionality to interface Lidar hardware.



Versions:
Expand Down
9 changes: 6 additions & 3 deletions brefv/core.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ message Envelope {
bytes payload = 2;
}

message TopicEnvelopePair {
message TimestampedTopicEnvelopePair {

// Timestamp when this pair of topic and envelope was serialized
google.protobuf.Timestamp timestamp = 1;

// Topic associated with the envelope
string topic = 1;
string topic = 2;

// The envelope
bytes envelope = 2;
bytes envelope = 3;
}
35 changes: 35 additions & 0 deletions brefv/payloads/compound/RadarReading.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
syntax = "proto3";

import "foxglove/Pose.proto";
import "foxglove/PackedElementField.proto";
import "google/protobuf/timestamp.proto";

package brefv.compound;

message RadarSpoke {
// Timestamp of radar spoke
google.protobuf.Timestamp timestamp = 1;

// Frame of reference
string frame_id = 2;

// The origin of the radar spoke relative to the frame of reference
foxglove.Pose pose = 3;

// Azimuth angle [rad] of this spoke
float azimuth = 4;

// Range of radar spoke
float range = 5;

// Fields in `data`. Generally just one field with the ´intensity´.
repeated foxglove.PackedElementField fields = 6;

// Intensities
bytes data = 7;

}

message RadarSweep {
repeated brefv.compound.RadarSpoke spokes = 1;
}
4 changes: 2 additions & 2 deletions brefv/python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ A python version of brefv

Not yet available on PyPi.

Install as: `pip install "git+https://github.com/MO-RISE/keelson.git@{TAG}#subdirectory=brefv/python"`
substituting `TAG` for whatever you want to install.
Install as: `pip install "git+https://github.com/MO-RISE/keelson.git@<TAG>#subdirectory=brefv/python"`
substituting `<TAG>` for whatever you want to install.

## Basic usage
See [test](./tests/)
2 changes: 1 addition & 1 deletion brefv/python/brefv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from google.protobuf.descriptor_pb2 import FileDescriptorSet
from google.protobuf.descriptor import Descriptor, FileDescriptor

from .core_pb2 import Envelope
from .core_pb2 import Envelope, TimestampedTopicEnvelopePair
from . import payloads

_PACKAGE_ROOT = Path(__file__).parent
Expand Down
2 changes: 1 addition & 1 deletion brefv/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def read(fname):

setup(
name="brefv",
version="0.1.0-pre.14",
version="0.1.0-pre.16",
license="Apache License 2.0",
description="brefv",
long_description=read("README.md"),
Expand Down
3 changes: 3 additions & 0 deletions brefv/tags.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ compressed_image:
laser_scan:
encoding: protobuf
description: foxglove.LaserScan
radar_spoke:
encoding: protobuf
description: brefv.compound.RadarSpoke
point_cloud:
encoding: protobuf
description: foxglove.PointCloud
Expand Down
Empty file added core/README.md
Empty file.
161 changes: 161 additions & 0 deletions core/bin/klog-record
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env python3

import json
import time
import atexit
import logging
import pathlib
import warnings
import argparse
from io import BufferedWriter
from queue import Queue, Empty
from threading import Thread, Event
from contextlib import contextmanager

import zenoh

from brefv import TimestampedTopicEnvelopePair

logger = logging.getLogger("klog-record")


@contextmanager
def ignore(*exceptions):
try:
yield
except exceptions:
logger.exception("Something went wrong in the listener!")


def write_message(writer: BufferedWriter, received_at: float, topic: str, envelope: bytes):
logger.debug("Writing to file: topic=%s, log_time=%s", topic, received_at)

data = TimestampedTopicEnvelopePair()
data.timestamp.FromNanoseconds(received_at)
data.topic = topic
data.envelope = envelope

serialized_data = data.SerializeToString()

serialized_length = len(serialized_data).to_bytes(4, "big", signed=False)

writer.write(serialized_length + serialized_data)


def run(session: zenoh.Session, args: argparse.Namespace):
queue = Queue()

close_down = Event()

def _recorder():

with args.output.open("wb") as fh:

while not close_down.is_set():
try:
received_at, sample = queue.get(timeout=0.01)
except Empty:
continue

with ignore(Exception):
topic = str(sample.key_expr)
logger.debug("Received sample on topic: %s", topic)

write_message(fh, received_at, topic, sample.value.payload)


t = Thread(target=_recorder)
t.daemon = True
t.start()


# And start subscribing
subscribers = [
session.declare_subscriber(key, lambda s: queue.put((time.time_ns(), s))) for key in args.key
]

while True:
try:
qsize = queue.qsize()
logger.debug("Approximate queue size is: %s", qsize)

if qsize > 100:
warnings.warn("Queue size is %s", qsize)
elif qsize > 1000:
raise RuntimeError(
f"Recorder is not capable of keeping up with data flow. Current queue size is {qsize}. Exiting!"
)

time.sleep(1.0)
except KeyboardInterrupt:
logger.info("Closing down on user request!")
logger.debug("Undeclaring subscribers...")
for sub in subscribers:
sub.undeclare()

logger.debug("Waiting for all items in queue to be processed...")
while not queue.empty():
time.sleep(0.1)

logger.debug("Joining recorder thread...")
close_down.set()
t.join()

logger.debug("Done! Good bye :)")
break


def main():
parser = argparse.ArgumentParser(
prog="klog-record",
description="A pure python klog recorder for keelson",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)

parser.add_argument("--log-level", type=int, default=logging.INFO)

parser.add_argument(
"-k",
"--key",
type=str,
action="append",
required=True,
help="Key expressions to subscribe to from the Zenoh session",
)

parser.add_argument(
"-o",
"--output",
type=pathlib.Path,
required=True,
help="File path to write recording to",
)

## Parse arguments and start doing our thing
args = parser.parse_args()

# Setup logger
logging.basicConfig(
format="%(asctime)s %(levelname)s %(name)s %(message)s", level=args.log_level
)
logging.captureWarnings(True)
zenoh.init_logger()

# Put together zenoh session configuration
conf = zenoh.Config()
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps("peer"))

## Construct session
logger.info("Opening Zenoh session...")
session = zenoh.open(conf)

def _on_exit():
session.close()

atexit.register(_on_exit)

run(session, args)


if __name__ == "__main__":
main()
Loading

0 comments on commit 486db54

Please sign in to comment.