From 221c4680230755e229944bb3d1de18e18ac5344c Mon Sep 17 00:00:00 2001 From: Leann Hsu Date: Tue, 31 Dec 2024 15:15:10 +0800 Subject: [PATCH 1/4] Fix streaming jitter --- zenoh_app/camera_autoware.py | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/zenoh_app/camera_autoware.py b/zenoh_app/camera_autoware.py index c8143ef..3e8e6e0 100644 --- a/zenoh_app/camera_autoware.py +++ b/zenoh_app/camera_autoware.py @@ -1,13 +1,15 @@ import threading import cv2 +import numpy as np import zenoh from cv_bridge import CvBridge from flask import Flask, Response from werkzeug.serving import make_server -from zenoh_ros_type.common_interfaces import Image +from zenoh_ros_type.common_interfaces import CameraInfo IMAGE_RAW_KEY_EXPR = '/sensing/camera/traffic_light/image_raw' +CAMERA_INFO_KEY_EXPR = '/sensing/camera/traffic_light/camera_info' # At 20 FPS, 10 frames represent 0.5 second of video data RING_CHANNEL_SIZE = 10 @@ -24,12 +26,16 @@ def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True): self.server = None self.use_bridge_ros2dds = use_bridge_ros2dds self.prefix = scope if use_bridge_ros2dds else scope + '/rt' - + self.height = None + self.width = None + + self.sub_info = self.session.declare_subscriber(self.prefix + CAMERA_INFO_KEY_EXPR) + self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE)) + # Start processing thread self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) self.frame_thread.start() - self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE)) @self.app.route('/') def index(): @@ -40,20 +46,36 @@ def video_feed(): return Response(self.generate_frame(), mimetype='multipart/x-mixed-replace; boundary=frame') def change_scope(self, new_scope): + self.sub_info.undeclare() self.sub_video.undeclare() + self.prefix = new_scope if self.use_bridge_ros2dds else new_scope + '/rt' + self.sub_info = self.session.declare_subscriber(self.prefix + CAMERA_INFO_KEY_EXPR) self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE)) self.scope = new_scope - + + self.width = None + self.height = None + def process_frame(self): while True: try: + if self.width is None or self.height is None: + sample = self.sub_video.try_recv() + if sample is None: + continue + camera_info = CameraInfo.deserialize(sample.payload.to_bytes()) + self.height = camera_info.height + self.width = camera_info.width + sample = self.sub_video.try_recv() if sample is None: continue - - data = Image.deserialize(sample.payload.to_bytes()) - self.camera_image = self.bridge.imgmsg_to_cv2(data, desired_encoding='passthrough') + + data = sample.payload.to_bytes() + np_data = np.frombuffer(data[-(self.height * self.width * 4):], dtype=np.uint8) + self.camera_image = np_data.reshape((self.height, self.width, 4)) + except Exception as e: print(f"Error processing frame: {e}") From 58f6b74c1b936599fa86aa142d9c1dc2a92c44f3 Mon Sep 17 00:00:00 2001 From: Leann Hsu Date: Tue, 31 Dec 2024 15:20:12 +0800 Subject: [PATCH 2/4] Remove unused functions Format mjpeg server --- api_server.py | 15 ---------- zenoh_app/camera_autoware.py | 57 +++--------------------------------- 2 files changed, 4 insertions(+), 68 deletions(-) diff --git a/api_server.py b/api_server.py index d87690f..b484886 100644 --- a/api_server.py +++ b/api_server.py @@ -71,8 +71,6 @@ async def manage_teleop_startup(scope): mjpeg_server.change_scope(scope) else: mjpeg_server = MJPEG_server(session, scope, use_bridge_ros2dds) - # mjpeg_server_thread = threading.Thread(target = mjpeg_server.run) - # mjpeg_server_thread.start() return { 'text': f'Startup manual control on {scope}.', 'mjpeg_host': 'localhost' if MJPEG_HOST == '0.0.0.0' else MJPEG_HOST, @@ -123,19 +121,6 @@ async def manage_teleop_status(): return {'velocity': '---', 'gear': '---', 'steering': '---'} -# @app.get("/map/startup") -# async def manage_map_startup(scope): -# global pose_service -# if pose_service is None: -# pose_service = PoseServer(session, scope) -# else: -# pose_service.change_scope(scope) - -# return { -# "text": f"Startup manual control on {scope}." -# } - - @app.get('/map/list') async def get_vehilcle_list(): global pose_service diff --git a/zenoh_app/camera_autoware.py b/zenoh_app/camera_autoware.py index 3e8e6e0..1e441e4 100644 --- a/zenoh_app/camera_autoware.py +++ b/zenoh_app/camera_autoware.py @@ -1,11 +1,7 @@ import threading -import cv2 import numpy as np import zenoh -from cv_bridge import CvBridge -from flask import Flask, Response -from werkzeug.serving import make_server from zenoh_ros_type.common_interfaces import CameraInfo IMAGE_RAW_KEY_EXPR = '/sensing/camera/traffic_light/image_raw' @@ -14,16 +10,12 @@ # At 20 FPS, 10 frames represent 0.5 second of video data RING_CHANNEL_SIZE = 10 + class MJPEG_server: def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True): - self.app = Flask(__name__) - self.bridge = CvBridge() self.camera_image = None self.session = zenoh_session self.scope = scope - self.host = None - self.port = None - self.server = None self.use_bridge_ros2dds = use_bridge_ros2dds self.prefix = scope if use_bridge_ros2dds else scope + '/rt' self.height = None @@ -36,15 +28,6 @@ def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True): self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) self.frame_thread.start() - - @self.app.route('/') - def index(): - return 'Motion JPEG Server' - - @self.app.route('/video') - def video_feed(): - return Response(self.generate_frame(), mimetype='multipart/x-mixed-replace; boundary=frame') - def change_scope(self, new_scope): self.sub_info.undeclare() self.sub_video.undeclare() @@ -73,45 +56,13 @@ def process_frame(self): continue data = sample.payload.to_bytes() - np_data = np.frombuffer(data[-(self.height * self.width * 4):], dtype=np.uint8) + np_data = np.frombuffer(data[-(self.height * self.width * 4) :], dtype=np.uint8) self.camera_image = np_data.reshape((self.height, self.width, 4)) except Exception as e: - print(f"Error processing frame: {e}") - - def shutdown(self): - self.sub_video.undeclare() - self.frame_thread.join() - - if self.host is not None and self.port is not None: - self.server.shutdown() - self.server = None - self.host = None - self.port = None - - def generate_frame(self): - while self.camera_image is None: - pass - while self.camera_image is not None: - # Encode the frame as JPEG - ret, buffer = cv2.imencode('.jpg', self.camera_image) - frame = buffer.tobytes() - yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n' b'Cache-Control: no-cache\r\n\r\n' + frame + b'\r\n') - - def run(self, host='0.0.0.0', port=5000): - if self.host is None and self.port is None: - if host == '0.0.0.0': - self.host = 'localhost' - else: - self.host = host - - self.port = port - - # self.app.run(host=host, port=port) - self.server = make_server(host, port, self.app) - self.server.serve_forever() + print(f'Error processing frame: {e}') if __name__ == '__main__': s = zenoh.open() - server = MJPEG_server(s, 'v1') \ No newline at end of file + server = MJPEG_server(s, 'v1') From 8d5dda9d3228f7e3c7621bfe70439269d36172bf Mon Sep 17 00:00:00 2001 From: Leann Hsu Date: Tue, 31 Dec 2024 16:11:42 +0800 Subject: [PATCH 3/4] Ensure proper thread synchronization --- zenoh_app/camera_autoware.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/zenoh_app/camera_autoware.py b/zenoh_app/camera_autoware.py index 1e441e4..e6456b9 100644 --- a/zenoh_app/camera_autoware.py +++ b/zenoh_app/camera_autoware.py @@ -2,10 +2,9 @@ import numpy as np import zenoh -from zenoh_ros_type.common_interfaces import CameraInfo +from zenoh_ros_type.common_interfaces import Image IMAGE_RAW_KEY_EXPR = '/sensing/camera/traffic_light/image_raw' -CAMERA_INFO_KEY_EXPR = '/sensing/camera/traffic_light/camera_info' # At 20 FPS, 10 frames represent 0.5 second of video data RING_CHANNEL_SIZE = 10 @@ -20,8 +19,8 @@ def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True): self.prefix = scope if use_bridge_ros2dds else scope + '/rt' self.height = None self.width = None + self.processing = True - self.sub_info = self.session.declare_subscriber(self.prefix + CAMERA_INFO_KEY_EXPR) self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE)) # Start processing thread @@ -29,35 +28,43 @@ def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True): self.frame_thread.start() def change_scope(self, new_scope): - self.sub_info.undeclare() + self.processing = False + self.frame_thread.join() + self.sub_video.undeclare() self.prefix = new_scope if self.use_bridge_ros2dds else new_scope + '/rt' - self.sub_info = self.session.declare_subscriber(self.prefix + CAMERA_INFO_KEY_EXPR) self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE)) self.scope = new_scope self.width = None self.height = None + + self.processing = True + self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) + self.frame_thread.start() def process_frame(self): - while True: + while self.processing: try: if self.width is None or self.height is None: sample = self.sub_video.try_recv() if sample is None: continue - camera_info = CameraInfo.deserialize(sample.payload.to_bytes()) - self.height = camera_info.height - self.width = camera_info.width + image = Image.deserialize(sample.payload.to_bytes()) + self.height = image.height + self.width = image.width sample = self.sub_video.try_recv() if sample is None: continue data = sample.payload.to_bytes() - np_data = np.frombuffer(data[-(self.height * self.width * 4) :], dtype=np.uint8) - self.camera_image = np_data.reshape((self.height, self.width, 4)) + + # Each pixel is 4 bytes (RGBA), total bytes = Height x Width x 4. + # Extract the last part of the ROS message as image data. + np_image = np.frombuffer(data[-(self.height * self.width * 4) :], dtype=np.uint8) + self.camera_image = np_image.reshape((self.height, self.width, 4)) except Exception as e: print(f'Error processing frame: {e}') From 43f7ca41f79a06360bb272cb319b21998539d586 Mon Sep 17 00:00:00 2001 From: Leann Hsu Date: Tue, 31 Dec 2024 16:14:21 +0800 Subject: [PATCH 4/4] Rename change_scope to change_vehicle --- api_server.py | 2 +- zenoh_app/camera_autoware.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/api_server.py b/api_server.py index b484886..6edebaf 100644 --- a/api_server.py +++ b/api_server.py @@ -68,7 +68,7 @@ async def manage_teleop_startup(scope): manual_controller = ManualController(session, scope, use_bridge_ros2dds) if mjpeg_server is not None: - mjpeg_server.change_scope(scope) + mjpeg_server.change_vehicle(scope) else: mjpeg_server = MJPEG_server(session, scope, use_bridge_ros2dds) return { diff --git a/zenoh_app/camera_autoware.py b/zenoh_app/camera_autoware.py index e6456b9..7f02a31 100644 --- a/zenoh_app/camera_autoware.py +++ b/zenoh_app/camera_autoware.py @@ -27,7 +27,7 @@ def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True): self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) self.frame_thread.start() - def change_scope(self, new_scope): + def change_vehicle(self, new_scope): self.processing = False self.frame_thread.join() @@ -39,7 +39,7 @@ def change_scope(self, new_scope): self.width = None self.height = None - + self.processing = True self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) self.frame_thread.start() @@ -60,7 +60,7 @@ def process_frame(self): continue data = sample.payload.to_bytes() - + # Each pixel is 4 bytes (RGBA), total bytes = Height x Width x 4. # Extract the last part of the ROS message as image data. np_image = np.frombuffer(data[-(self.height * self.width * 4) :], dtype=np.uint8)