Skip to content

Commit

Permalink
Fix streaming jitter, ensure proper thread synchronization (#42)
Browse files Browse the repository at this point in the history
* Fix streaming jitter

* Remove unused functions

Format mjpeg server

* Ensure proper thread synchronization

* Rename change_scope to change_vehicle
  • Loading branch information
hsule authored Jan 1, 2025
1 parent 862b6b9 commit 436ff8f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 75 deletions.
17 changes: 1 addition & 16 deletions api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,9 @@ async def manage_teleop_startup(scope):
manual_controller = ManualController(session, scope, use_bridge_ros2dds)

if mjpeg_server is not None:
mjpeg_server.change_scope(scope)
mjpeg_server.change_vehicle(scope)
else:
mjpeg_server = MJPEG_server(session, scope, use_bridge_ros2dds)
# mjpeg_server_thread = threading.Thread(target = mjpeg_server.run)
# mjpeg_server_thread.start()
return {
'text': f'Startup manual control on {scope}.',
'mjpeg_host': 'localhost' if MJPEG_HOST == '0.0.0.0' else MJPEG_HOST,
Expand Down Expand Up @@ -123,19 +121,6 @@ async def manage_teleop_status():
return {'velocity': '---', 'gear': '---', 'steering': '---'}


# @app.get("/map/startup")
# async def manage_map_startup(scope):
# global pose_service
# if pose_service is None:
# pose_service = PoseServer(session, scope)
# else:
# pose_service.change_scope(scope)

# return {
# "text": f"Startup manual control on {scope}."
# }


@app.get('/map/list')
async def get_vehilcle_list():
global pose_service
Expand Down
98 changes: 39 additions & 59 deletions zenoh_app/camera_autoware.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,75 @@
import threading

import cv2
import numpy as np
import zenoh
from cv_bridge import CvBridge
from flask import Flask, Response
from werkzeug.serving import make_server
from zenoh_ros_type.common_interfaces import Image

IMAGE_RAW_KEY_EXPR = '/sensing/camera/traffic_light/image_raw'

# At 20 FPS, 10 frames represent 0.5 second of video data
RING_CHANNEL_SIZE = 10


class MJPEG_server:
def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True):
self.app = Flask(__name__)
self.bridge = CvBridge()
self.camera_image = None
self.session = zenoh_session
self.scope = scope
self.host = None
self.port = None
self.server = None
self.use_bridge_ros2dds = use_bridge_ros2dds
self.prefix = scope if use_bridge_ros2dds else scope + '/rt'

# Start processing thread
self.frame_thread = threading.Thread(target=self.process_frame, daemon=True)
self.frame_thread.start()
self.height = None
self.width = None
self.processing = True

self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE))

@self.app.route('/')
def index():
return 'Motion JPEG Server'
# Start processing thread
self.frame_thread = threading.Thread(target=self.process_frame, daemon=True)
self.frame_thread.start()

@self.app.route('/video')
def video_feed():
return Response(self.generate_frame(), mimetype='multipart/x-mixed-replace; boundary=frame')
def change_vehicle(self, new_scope):
self.processing = False
self.frame_thread.join()

def change_scope(self, new_scope):
self.sub_video.undeclare()

self.prefix = new_scope if self.use_bridge_ros2dds else new_scope + '/rt'
self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE))
self.scope = new_scope


self.width = None
self.height = None

self.processing = True
self.frame_thread = threading.Thread(target=self.process_frame, daemon=True)
self.frame_thread.start()

def process_frame(self):
while True:
while self.processing:
try:
if self.width is None or self.height is None:
sample = self.sub_video.try_recv()
if sample is None:
continue
image = Image.deserialize(sample.payload.to_bytes())
self.height = image.height
self.width = image.width

sample = self.sub_video.try_recv()
if sample is None:
continue

data = Image.deserialize(sample.payload.to_bytes())
self.camera_image = self.bridge.imgmsg_to_cv2(data, desired_encoding='passthrough')
except Exception as e:
print(f"Error processing frame: {e}")

def shutdown(self):
self.sub_video.undeclare()
self.frame_thread.join()

if self.host is not None and self.port is not None:
self.server.shutdown()
self.server = None
self.host = None
self.port = None

def generate_frame(self):
while self.camera_image is None:
pass
while self.camera_image is not None:
# Encode the frame as JPEG
ret, buffer = cv2.imencode('.jpg', self.camera_image)
frame = buffer.tobytes()
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n' b'Cache-Control: no-cache\r\n\r\n' + frame + b'\r\n')

def run(self, host='0.0.0.0', port=5000):
if self.host is None and self.port is None:
if host == '0.0.0.0':
self.host = 'localhost'
else:
self.host = host

self.port = port

# self.app.run(host=host, port=port)
self.server = make_server(host, port, self.app)
self.server.serve_forever()
data = sample.payload.to_bytes()

# Each pixel is 4 bytes (RGBA), total bytes = Height x Width x 4.
# Extract the last part of the ROS message as image data.
np_image = np.frombuffer(data[-(self.height * self.width * 4) :], dtype=np.uint8)
self.camera_image = np_image.reshape((self.height, self.width, 4))

except Exception as e:
print(f'Error processing frame: {e}')


if __name__ == '__main__':
s = zenoh.open()
server = MJPEG_server(s, 'v1')
server = MJPEG_server(s, 'v1')

0 comments on commit 436ff8f

Please sign in to comment.