Skip to content

Commit

Permalink
update MJPEGCamera
Browse files Browse the repository at this point in the history
  • Loading branch information
walesch-yan authored and marcus-oscarsson committed Jan 17, 2025
1 parent abc9915 commit 52fe98f
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 45 deletions.
21 changes: 20 additions & 1 deletion docs/usage/cameras.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ The `MJPEGCamera` provides specialized support for *MJPEG* video streams. It is

> **Note**: Currently the `MJPEGCamera` is the only camera that does not support conversion to a `Redis` Pub/Sub channel (more about streaming on a [redis channel](setup.md#dual-streaming-seamlessly-serve-mjpeg-and-redis-pubsub-video-feeds))
#### Authentication for MJPEG Streams

Some MJPEG streams may require authentication to access. To support such scenarios, the `MJPEGCamera` class includes built-in authentication support. Currently, both `Basic` and `Digest` authentication methods are supported.

Below is an example of how to use the video-streamer to access a stream requiring `Basic` authentication:

```bash
video-streamer -of MPEG1 -uri <stream_url> -auth Basic -user <username> -pass <password>
```

##### Explanation of the Parameters:
- `-of`: Specifies the ouput format, here `MPEG1` is used.
- `-uri`: The URL of the MJPEG stream.
- `-auth`: Specifies the authentication method (`Basic` or `Digest`)
- `-user`: The username for authentication
- `-pass`: The password required for authentication

Replace `<stream_url>`, `<username>` and `<password>` with the appropriate values for your stream. Ensure you handle credentials securely and avoid exposing them in public or shared scripts!

---

## RedisCamera
Expand All @@ -48,7 +67,7 @@ Instead of using a real camera, the `video-streamer` allows to use said `Redis`
To use the `RedisCamera`, one can use the following command:

```bash
video-streamer -d -of MJPEG1 -uri redis://[host]:[port] -irc ExampleStream
video-streamer -d -of MPEG1 -uri redis://[host]:[port] -irc ExampleStream
```

where `host` and `port` are the respective host and port of the `Redis` server and `ExampleStream` would be the Pub/Sub channel to use for generating the stream.
Expand Down
163 changes: 127 additions & 36 deletions video_streamer/core/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
except ImportError:
logging.warning("PyTango not available.")

from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from video_streamer.core.config import AuthenticationConfiguration

class Camera:
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
self._device_uri = device_uri
self._sleep_time = sleep_time
self._debug = debug
Expand All @@ -48,7 +50,7 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
self._output = output
if self._redis:
host, port = self._redis.split(':')
self._redis_client = redis.StrictRedis(host=host, port=port)
self._redis_client = redis.StrictRedis(host=host, port=int(port))

while True:
try:
Expand All @@ -63,7 +65,7 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
pass

@property
def size(self) -> Tuple[float, float]:
def size(self) -> Tuple[int, int]:
return (self._width, self._height)

def get_jpeg(self, data, size=(0, 0)) -> bytearray:
Expand All @@ -76,37 +78,131 @@ def get_jpeg(self, data, size=(0, 0)) -> bytearray:
image.save(jpeg_data, format="JPEG")
jpeg_data = jpeg_data.getvalue()

return jpeg_data
return bytearray(jpeg_data)

def _image_to_rgb24(self, image: bytes) -> bytearray:
"""
Convert binary image data into raw RGB24-encoded byte array
Supported image types include JPEG, PNG, BMP, TIFF, GIF, ...
"""
image_array = np.frombuffer(image, dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return bytearray(rgb_frame.tobytes())


class MJPEGCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, auth_config: AuthenticationConfiguration, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)
self._authentication=self._createAuthenticationHeader(auth_config)
self._set_size()

def _set_size(self) -> None:
buffer = bytearray()
# To set the size, extract the first image from the MJPEG stream
try:
response = requests.get(self._device_uri, stream=True, verify=False, auth=self._authentication)
if response.status_code == 200:
boundary = self._extract_boundary(response.headers)
if not boundary:
logging.error("Boundary not found in Content-Type header.")
return

for chunk in response.iter_content(chunk_size=8192):
buffer.extend(chunk)

while True:
frame, buffer = self._extract_frame(buffer, boundary)
if frame is None:
break
image = Image.open(io.BytesIO(frame))
self._width, self._height = image.size
return
else:
logging.error(f"Received unexpected status code {response.status_code}")
return
except requests.RequestException as e:
logging.exception(f"Exception occured during stream request")
return

def _createAuthenticationHeader(self, auth_config:AuthenticationConfiguration) -> Union[None, HTTPBasicAuth, HTTPDigestAuth]:
type = auth_config.type
if type == "Basic":
return HTTPBasicAuth(username=auth_config.username, password=auth_config.password)
elif type == "Digest":
return HTTPDigestAuth(username=auth_config.username, password=auth_config.password)
elif type:
logging.warning("Unknown authentication Type {type}")
return None

def _extract_boundary(self, headers):
"""
Extract the boundary marker from the Content-Type header.
"""
content_type = headers.get("Content-Type", "")
if "boundary=" in content_type:
return content_type.split("boundary=")[-1]
return None

def _extract_frame(self, buffer: bytearray, boundary: str):
"""
Extract a single JPEG frame from the buffer if a complete frame exists.
Returns a tuple of (frame_data, remaining_buffer).
"""
boundary_bytes = f"--{boundary}".encode()
start_index = buffer.find(boundary_bytes)
if start_index == -1:
return None, buffer # Boundary not found

# Find the next boundary after the current one
next_index = buffer.find(boundary_bytes, start_index + len(boundary_bytes))
if next_index == -1:
return None, buffer # Complete frame not yet available

# Extract the data between boundaries
frame_section = buffer[start_index + len(boundary_bytes):next_index]

# Separate headers and JPEG data
header_end = frame_section.find(b"\r\n\r\n") # End of headers
if header_end == -1:
return None, buffer # Headers not fully received

# Extract the JPEG data
frame_data = frame_section[header_end + 4:] # Skip past the headers
remaining_buffer = buffer[next_index:] # Data after the next boundary
return frame_data.strip(), remaining_buffer # Strip any extra whitespace

def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
# auth=("user", "password")
r = requests.get(self._device_uri, stream=True)
buffer = bytearray()
self._output = output

buffer = bytes()
while True:
try:
if r.status_code == 200:
for chunk in r.iter_content(chunk_size=1024):
buffer += chunk

response = requests.get(self._device_uri, stream=True, verify=False, auth=self._authentication)
if response.status_code == 200:
boundary = self._extract_boundary(response.headers)
if not boundary:
logging.error("Boundary not found in Content-Type header.")
break

for chunk in response.iter_content(chunk_size=8192):
buffer.extend(chunk)

while True:
frame, buffer = self._extract_frame(buffer, boundary)
if frame is None:
break
self._write_data(self._image_to_rgb24(bytes(frame)))
else:
print("Received unexpected status code {}".format(r.status_code))
except requests.exceptions.StreamConsumedError:
output.put(buffer)
r = requests.get(self._device_uri, stream=True)
buffer = bytes()

def get_jpeg(self, data, size=None) -> bytearray:
return data
logging.error(f"Received unexpected status code {response.status_code}")
break
except requests.RequestException as e:
logging.exception(f"Exception occured during stream request")
break


class LimaCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)

self._lima_tango_device = self._connect(self._device_uri)
Expand All @@ -131,7 +227,7 @@ def _get_image(self) -> Tuple[bytearray, float, float, int]:

hfmt = ">IHHqiiHHHH"
hsize = struct.calcsize(hfmt)
_, _, img_mode, frame_number, width, height, _, _, _, _ = struct.unpack(
_, _, _, frame_number, width, height, _, _, _, _ = struct.unpack(
hfmt, img_data[1][:hsize]
)

Expand Down Expand Up @@ -162,7 +258,7 @@ def _poll_once(self) -> None:


class RedisCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, out_redis: str = None, out_redis_channel: str = None, in_redis_channel: str = 'frames'):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, out_redis: str = None, out_redis_channel: str = None, in_redis_channel: str = 'frames'):
super().__init__(device_uri, sleep_time, debug, out_redis, out_redis_channel)
# for this camera in_redis_... is for the input and redis_... as usual for output
self._in_redis_client = self._connect(self._device_uri)
Expand Down Expand Up @@ -203,17 +299,12 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
"frame_number": self._last_frame_number
}
self._redis_client.publish(self._redis_channel, json.dumps(frame_dict))
raw_image_data = base64.b64decode(frame["data"])
# ffmpeg needs an rgb encoded image, since we cannot be sure if the image was in rgb or
# bgr(common for cv2 image manipulation) we need these transformations
image_array = np.frombuffer(raw_image_data, dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

self._write_data(rgb_frame.tobytes())
raw_image_data = base64.b64decode(frame["data"])
self._write_data(self._image_to_rgb24(raw_image_data))

class TestCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)
self._sleep_time = 0.05
testimg_fpath = os.path.join(os.path.dirname(__file__), "fakeimg.jpg")
Expand All @@ -224,7 +315,7 @@ def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis:
self._last_frame_number = -1

def _poll_once(self) -> None:
self._write_data(self._raw_data)
self._write_data(bytearray(self._raw_data))

self._last_frame_number += 1
if self._redis:
Expand All @@ -239,7 +330,7 @@ def _poll_once(self) -> None:
time.sleep(self._sleep_time)

class VideoTestCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)
self._sleep_time = 0.04
# for your testvideo, please use an uncompressed video or mjpeg codec,
Expand All @@ -252,7 +343,7 @@ def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis:

def _poll_once(self) -> None:
if not self._video_capture.isOpened():
print("Video capture is not opened.")
logging.error("Video capture is not opened.")
return

ret, frame = self._video_capture.read()
Expand All @@ -262,7 +353,7 @@ def _poll_once(self) -> None:
self._video_capture = cv2.VideoCapture(self._testvideo_fpath)
ret, frame = self._video_capture.read()
if not ret:
print("Failed to restart video capture.")
logging.error("Failed to restart video capture.")
return

frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
Expand All @@ -283,7 +374,7 @@ def _poll_once(self) -> None:

def _set_video_dimensions(self):
if not self._video_capture.isOpened():
print("Video capture is not opened.")
logging.error("Video capture is not opened.")
return
self._width = int(self._video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
self._height = int(self._video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
43 changes: 41 additions & 2 deletions video_streamer/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,32 @@
from typing_extensions import Self

__all__ = (
"AuthenticationConfiguration",
"SourceConfiguration",
"ServerConfiguration",
"get_config_from_file",
"get_config_from_dict",
"get_auth_config_from_dict",
)

class AuthenticationConfiguration(BaseModel):
"""Authentication Configuration"""

type: Union[str, None] = Field(
title="Authentication Type",
description="Type of authentication, supported types are 'Basic', 'Digest', None",
default= None,
)

username: Union[bytes, str] = Field(
title="Username",
default="",
)

password: Union[bytes, str] = Field(
title="Password",
default="",
)

class SourceConfiguration(BaseModel):
"""Source Configuration"""
Expand Down Expand Up @@ -65,7 +85,10 @@ class SourceConfiguration(BaseModel):
description= "Channel for RedisCamera to listen to",
default="CameraStream",
)

auth_config: AuthenticationConfiguration = Field(
title="Authentication Configurations",
default=AuthenticationConfiguration(type=None),
)

class ServerConfiguration(BaseModel):
"""Server Configuration"""
Expand Down Expand Up @@ -130,7 +153,7 @@ def get_config_from_file(fpath: Union[str, Path]) -> Union[ServerConfiguration,


def get_config_from_dict(
config_data: dict[str, Any],
config_data: Dict[str, Any],
) -> Union[ServerConfiguration, None]:
"""Get server configuration from dictionary.
Expand All @@ -144,3 +167,19 @@ def get_config_from_dict(
return ServerConfiguration.model_validate(config_data)
except ValidationError:
return None

def get_auth_config_from_dict(
config_data: Dict[str, Any],
) -> Union[AuthenticationConfiguration, None]:
"""Get authentication configuration from dictionary.
Args:
config_data (dict[str, Any]): Authentication Data.
Returns:
Union[AuthenticationConfiguration, None]: Authentication Configuration or None.
"""
try:
return AuthenticationConfiguration.model_validate(config_data)
except ValidationError:
return None
Loading

0 comments on commit 52fe98f

Please sign in to comment.