Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix streaming jitter, ensure proper thread synchronization #42

Merged
merged 4 commits into from
Jan 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 1 addition & 16 deletions api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,9 @@ async def manage_teleop_startup(scope):
manual_controller = ManualController(session, scope, use_bridge_ros2dds)

if mjpeg_server is not None:
mjpeg_server.change_scope(scope)
mjpeg_server.change_vehicle(scope)
else:
mjpeg_server = MJPEG_server(session, scope, use_bridge_ros2dds)
# mjpeg_server_thread = threading.Thread(target = mjpeg_server.run)
# mjpeg_server_thread.start()
return {
'text': f'Startup manual control on {scope}.',
'mjpeg_host': 'localhost' if MJPEG_HOST == '0.0.0.0' else MJPEG_HOST,
Expand Down Expand Up @@ -123,19 +121,6 @@ async def manage_teleop_status():
return {'velocity': '---', 'gear': '---', 'steering': '---'}


# @app.get("/map/startup")
# async def manage_map_startup(scope):
# global pose_service
# if pose_service is None:
# pose_service = PoseServer(session, scope)
# else:
# pose_service.change_scope(scope)

# return {
# "text": f"Startup manual control on {scope}."
# }


@app.get('/map/list')
async def get_vehilcle_list():
global pose_service
Expand Down
98 changes: 39 additions & 59 deletions zenoh_app/camera_autoware.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,75 @@
import threading

import cv2
import numpy as np
import zenoh
from cv_bridge import CvBridge
from flask import Flask, Response
from werkzeug.serving import make_server
from zenoh_ros_type.common_interfaces import Image

IMAGE_RAW_KEY_EXPR = '/sensing/camera/traffic_light/image_raw'

# At 20 FPS, 10 frames represent 0.5 second of video data
RING_CHANNEL_SIZE = 10


class MJPEG_server:
def __init__(self, zenoh_session, scope, use_bridge_ros2dds=True):
self.app = Flask(__name__)
self.bridge = CvBridge()
self.camera_image = None
self.session = zenoh_session
self.scope = scope
self.host = None
self.port = None
self.server = None
self.use_bridge_ros2dds = use_bridge_ros2dds
self.prefix = scope if use_bridge_ros2dds else scope + '/rt'

# Start processing thread
self.frame_thread = threading.Thread(target=self.process_frame, daemon=True)
self.frame_thread.start()
self.height = None
self.width = None
self.processing = True

self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE))

@self.app.route('/')
def index():
return 'Motion JPEG Server'
# Start processing thread
self.frame_thread = threading.Thread(target=self.process_frame, daemon=True)
self.frame_thread.start()

@self.app.route('/video')
def video_feed():
return Response(self.generate_frame(), mimetype='multipart/x-mixed-replace; boundary=frame')
def change_vehicle(self, new_scope):
self.processing = False
self.frame_thread.join()

def change_scope(self, new_scope):
self.sub_video.undeclare()

self.prefix = new_scope if self.use_bridge_ros2dds else new_scope + '/rt'
self.sub_video = self.session.declare_subscriber(self.prefix + IMAGE_RAW_KEY_EXPR, zenoh.handlers.RingChannel(RING_CHANNEL_SIZE))
self.scope = new_scope


self.width = None
self.height = None

self.processing = True
self.frame_thread = threading.Thread(target=self.process_frame, daemon=True)
self.frame_thread.start()

def process_frame(self):
while True:
while self.processing:
try:
if self.width is None or self.height is None:
sample = self.sub_video.try_recv()
if sample is None:
continue
image = Image.deserialize(sample.payload.to_bytes())
self.height = image.height
self.width = image.width

sample = self.sub_video.try_recv()
if sample is None:
continue

data = Image.deserialize(sample.payload.to_bytes())
self.camera_image = self.bridge.imgmsg_to_cv2(data, desired_encoding='passthrough')
except Exception as e:
print(f"Error processing frame: {e}")

def shutdown(self):
self.sub_video.undeclare()
self.frame_thread.join()

if self.host is not None and self.port is not None:
self.server.shutdown()
self.server = None
self.host = None
self.port = None

def generate_frame(self):
while self.camera_image is None:
pass
while self.camera_image is not None:
# Encode the frame as JPEG
ret, buffer = cv2.imencode('.jpg', self.camera_image)
frame = buffer.tobytes()
yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n' b'Cache-Control: no-cache\r\n\r\n' + frame + b'\r\n')

def run(self, host='0.0.0.0', port=5000):
if self.host is None and self.port is None:
if host == '0.0.0.0':
self.host = 'localhost'
else:
self.host = host

self.port = port

# self.app.run(host=host, port=port)
self.server = make_server(host, port, self.app)
self.server.serve_forever()
data = sample.payload.to_bytes()

# Each pixel is 4 bytes (RGBA), total bytes = Height x Width x 4.
# Extract the last part of the ROS message as image data.
np_image = np.frombuffer(data[-(self.height * self.width * 4) :], dtype=np.uint8)
self.camera_image = np_image.reshape((self.height, self.width, 4))

except Exception as e:
print(f'Error processing frame: {e}')


if __name__ == '__main__':
s = zenoh.open()
server = MJPEG_server(s, 'v1')
server = MJPEG_server(s, 'v1')
Loading