From 436ff8f5d6494ae07b7373900201c35acc048376 Mon Sep 17 00:00:00 2001 From: Leann Hsu Date: Wed, 1 Jan 2025 09:15:18 +0800 Subject: [PATCH] Fix streaming jitter, ensure proper thread synchronization (#42) * Fix streaming jitter * Remove unused functions Format mjpeg server * Ensure proper thread synchronization * Rename change_scope to change_vehicle --- api_server.py | 17 +------ zenoh_app/camera_autoware.py | 98 ++++++++++++++---------------------- 2 files changed, 40 insertions(+), 75 deletions(-) diff --git a/api_server.py b/api_server.py index d87690f..6edebaf 100644 --- a/api_server.py +++ b/api_server.py @@ -68,11 +68,9 @@ async def manage_teleop_startup(scope): manual_controller = ManualController(session, scope, use_bridge_ros2dds) if mjpeg_server is not None: - mjpeg_server.change_scope(scope) + mjpeg_server.change_vehicle(scope) else: mjpeg_server = MJPEG_server(session, scope, use_bridge_ros2dds) - # mjpeg_server_thread = threading.Thread(target = mjpeg_server.run) - # mjpeg_server_thread.start() return { 'text': f'Startup manual control on {scope}.', 'mjpeg_host': 'localhost' if MJPEG_HOST == '0.0.0.0' else MJPEG_HOST, @@ -123,19 +121,6 @@ async def manage_teleop_status(): return {'velocity': '---', 'gear': '---', 'steering': '---'} -# @app.get("/map/startup") -# async def manage_map_startup(scope): -# global pose_service -# if pose_service is None: -# pose_service = PoseServer(session, scope) -# else: -# pose_service.change_scope(scope) - -# return { -# "text": f"Startup manual control on {scope}." -# } - - @app.get('/map/list') async def get_vehilcle_list(): global pose_service diff --git a/zenoh_app/camera_autoware.py b/zenoh_app/camera_autoware.py index c8143ef..7f02a31 100644 --- a/zenoh_app/camera_autoware.py +++ b/zenoh_app/camera_autoware.py @@ -1,10 +1,7 @@ import threading -import cv2 +import numpy as np import zenoh -from cv_bridge import CvBridge -from flask import Flask, Response -from werkzeug.serving import make_server from zenoh_ros_type.common_interfaces import Image IMAGE_RAW_KEY_EXPR = '/sensing/camera/traffic_light/image_raw' @@ -12,84 +9,67 @@ # At 20 FPS, 10 frames represent 0.5 second of video data RING_CHANNEL_SIZE = 10 + class MJPEG_server: def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True): - self.app = Flask(__name__) - self.bridge = CvBridge() self.camera_image = None self.session = zenoh_session self.scope = scope - self.host = None - self.port = None - self.server = None self.use_bridge_ros2dds = use_bridge_ros2dds self.prefix = scope if use_bridge_ros2dds else scope + '/rt' - - # Start processing thread - self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) - self.frame_thread.start() + self.height = None + self.width = None + self.processing = True self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE)) - @self.app.route('/') - def index(): - return 'Motion JPEG Server' + # Start processing thread + self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) + self.frame_thread.start() - @self.app.route('/video') - def video_feed(): - return Response(self.generate_frame(), mimetype='multipart/x-mixed-replace; boundary=frame') + def change_vehicle(self, new_scope): + self.processing = False + self.frame_thread.join() - def change_scope(self, new_scope): self.sub_video.undeclare() + self.prefix = new_scope if self.use_bridge_ros2dds else new_scope + '/rt' self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE)) self.scope = new_scope - + + self.width = None + self.height = None + + self.processing = True + self.frame_thread = threading.Thread(target=self.process_frame, daemon=True) + self.frame_thread.start() + def process_frame(self): - while True: + while self.processing: try: + if self.width is None or self.height is None: + sample = self.sub_video.try_recv() + if sample is None: + continue + image = Image.deserialize(sample.payload.to_bytes()) + self.height = image.height + self.width = image.width + sample = self.sub_video.try_recv() if sample is None: continue - - data = Image.deserialize(sample.payload.to_bytes()) - self.camera_image = self.bridge.imgmsg_to_cv2(data, desired_encoding='passthrough') - except Exception as e: - print(f"Error processing frame: {e}") - def shutdown(self): - self.sub_video.undeclare() - self.frame_thread.join() - - if self.host is not None and self.port is not None: - self.server.shutdown() - self.server = None - self.host = None - self.port = None - - def generate_frame(self): - while self.camera_image is None: - pass - while self.camera_image is not None: - # Encode the frame as JPEG - ret, buffer = cv2.imencode('.jpg', self.camera_image) - frame = buffer.tobytes() - yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n' b'Cache-Control: no-cache\r\n\r\n' + frame + b'\r\n') - - def run(self, host='0.0.0.0', port=5000): - if self.host is None and self.port is None: - if host == '0.0.0.0': - self.host = 'localhost' - else: - self.host = host - - self.port = port - - # self.app.run(host=host, port=port) - self.server = make_server(host, port, self.app) - self.server.serve_forever() + data = sample.payload.to_bytes() + + # Each pixel is 4 bytes (RGBA), total bytes = Height x Width x 4. + # Extract the last part of the ROS message as image data. + np_image = np.frombuffer(data[-(self.height * self.width * 4) :], dtype=np.uint8) + self.camera_image = np_image.reshape((self.height, self.width, 4)) + + except Exception as e: + print(f'Error processing frame: {e}') if __name__ == '__main__': s = zenoh.open() - server = MJPEG_server(s, 'v1') \ No newline at end of file + server = MJPEG_server(s, 'v1')