Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update MJPEGCamera #29

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion docs/usage/cameras.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ The `MJPEGCamera` provides specialized support for *MJPEG* video streams. It is

> **Note**: Currently the `MJPEGCamera` is the only camera that does not support conversion to a `Redis` Pub/Sub channel (more about streaming on a [redis channel](setup.md#dual-streaming-seamlessly-serve-mjpeg-and-redis-pubsub-video-feeds))

#### Authentication for MJPEG Streams

Some MJPEG streams may require authentication to access. To support such scenarios, the `MJPEGCamera` class includes built-in authentication support. Currently, both `Basic` and `Digest` authentication methods are supported.

Below is an example of how to use the video-streamer to access a stream requiring `Basic` authentication:

```bash
video-streamer -of MPEG1 -uri <stream_url> -auth Basic -user <username> -pass <password>
```

##### Explanation of the Parameters:
- `-of`: Specifies the ouput format, here `MPEG1` is used.
- `-uri`: The URL of the MJPEG stream.
- `-auth`: Specifies the authentication method (`Basic` or `Digest`)
- `-user`: The username for authentication
- `-pass`: The password required for authentication

Replace `<stream_url>`, `<username>` and `<password>` with the appropriate values for your stream. Ensure you handle credentials securely and avoid exposing them in public or shared scripts!

Comment on lines +39 to +57
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I added some documentation for the newly added authentication feature:)

---

## RedisCamera
Expand All @@ -48,7 +67,7 @@ Instead of using a real camera, the `video-streamer` allows to use said `Redis`
To use the `RedisCamera`, one can use the following command:

```bash
video-streamer -d -of MJPEG1 -uri redis://[host]:[port] -irc ExampleStream
video-streamer -d -of MPEG1 -uri redis://[host]:[port] -irc ExampleStream
```

where `host` and `port` are the respective host and port of the `Redis` server and `ExampleStream` would be the Pub/Sub channel to use for generating the stream.
Expand Down
163 changes: 127 additions & 36 deletions video_streamer/core/camera.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
except ImportError:
logging.warning("PyTango not available.")

from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from video_streamer.core.config import AuthenticationConfiguration

class Camera:
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
self._device_uri = device_uri
self._sleep_time = sleep_time
self._debug = debug
Expand All @@ -48,7 +50,7 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
self._output = output
if self._redis:
host, port = self._redis.split(':')
self._redis_client = redis.StrictRedis(host=host, port=port)
self._redis_client = redis.StrictRedis(host=host, port=int(port))

while True:
try:
Expand All @@ -63,7 +65,7 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
pass

@property
def size(self) -> Tuple[float, float]:
def size(self) -> Tuple[int, int]:
return (self._width, self._height)

def get_jpeg(self, data, size=(0, 0)) -> bytearray:
Expand All @@ -76,37 +78,131 @@ def get_jpeg(self, data, size=(0, 0)) -> bytearray:
image.save(jpeg_data, format="JPEG")
jpeg_data = jpeg_data.getvalue()

return jpeg_data
return bytearray(jpeg_data)

def _image_to_rgb24(self, image: bytes) -> bytearray:
"""
Convert binary image data into raw RGB24-encoded byte array
Supported image types include JPEG, PNG, BMP, TIFF, GIF, ...
"""
image_array = np.frombuffer(image, dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
return bytearray(rgb_frame.tobytes())
Comment on lines +83 to +91
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another addition is this function in the Camera base class, which allows the transformation of any image type supported by opencv into the desired rgb24 format



class MJPEGCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, auth_config: AuthenticationConfiguration, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)
self._authentication=self._createAuthenticationHeader(auth_config)
self._set_size()

def _set_size(self) -> None:
buffer = bytearray()
# To set the size, extract the first image from the MJPEG stream
try:
response = requests.get(self._device_uri, stream=True, verify=False, auth=self._authentication)
if response.status_code == 200:
boundary = self._extract_boundary(response.headers)
if not boundary:
logging.error("Boundary not found in Content-Type header.")
return

for chunk in response.iter_content(chunk_size=8192):
buffer.extend(chunk)

while True:
frame, buffer = self._extract_frame(buffer, boundary)
if frame is None:
break
image = Image.open(io.BytesIO(frame))
self._width, self._height = image.size
return
else:
logging.error(f"Received unexpected status code {response.status_code}")
return
except requests.RequestException as e:
logging.exception(f"Exception occured during stream request")
return
Comment on lines +100 to +126
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first consumed image from the stream is used to set the size from the input source. This ensures correct encoding/decoding in the streamer later on

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice


def _createAuthenticationHeader(self, auth_config:AuthenticationConfiguration) -> Union[None, HTTPBasicAuth, HTTPDigestAuth]:
type = auth_config.type
if type == "Basic":
return HTTPBasicAuth(username=auth_config.username, password=auth_config.password)
elif type == "Digest":
return HTTPDigestAuth(username=auth_config.username, password=auth_config.password)
elif type:
logging.warning("Unknown authentication Type {type}")
return None

def _extract_boundary(self, headers):
"""
Extract the boundary marker from the Content-Type header.
"""
content_type = headers.get("Content-Type", "")
if "boundary=" in content_type:
return content_type.split("boundary=")[-1]
return None
Comment on lines +138 to +145
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The boundary extracted here acts as a delimiter between different images in an MJPEG stream, from my understanding, this delimiter could be different depending on the stream, but should always be present in the response header, hence this extraction function


def _extract_frame(self, buffer: bytearray, boundary: str):
"""
Extract a single JPEG frame from the buffer if a complete frame exists.
Returns a tuple of (frame_data, remaining_buffer).
"""
boundary_bytes = f"--{boundary}".encode()
start_index = buffer.find(boundary_bytes)
if start_index == -1:
return None, buffer # Boundary not found

# Find the next boundary after the current one
next_index = buffer.find(boundary_bytes, start_index + len(boundary_bytes))
if next_index == -1:
return None, buffer # Complete frame not yet available

# Extract the data between boundaries
frame_section = buffer[start_index + len(boundary_bytes):next_index]

# Separate headers and JPEG data
header_end = frame_section.find(b"\r\n\r\n") # End of headers
if header_end == -1:
return None, buffer # Headers not fully received

# Extract the JPEG data
frame_data = frame_section[header_end + 4:] # Skip past the headers
remaining_buffer = buffer[next_index:] # Data after the next boundary
return frame_data.strip(), remaining_buffer # Strip any extra whitespace

def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
# auth=("user", "password")
r = requests.get(self._device_uri, stream=True)
buffer = bytearray()
self._output = output

buffer = bytes()
while True:
try:
if r.status_code == 200:
for chunk in r.iter_content(chunk_size=1024):
buffer += chunk

response = requests.get(self._device_uri, stream=True, verify=False, auth=self._authentication)
if response.status_code == 200:
boundary = self._extract_boundary(response.headers)
if not boundary:
logging.error("Boundary not found in Content-Type header.")
break

for chunk in response.iter_content(chunk_size=8192):
buffer.extend(chunk)

while True:
frame, buffer = self._extract_frame(buffer, boundary)
if frame is None:
break
self._write_data(self._image_to_rgb24(bytes(frame)))
else:
print("Received unexpected status code {}".format(r.status_code))
except requests.exceptions.StreamConsumedError:
output.put(buffer)
r = requests.get(self._device_uri, stream=True)
buffer = bytes()

def get_jpeg(self, data, size=None) -> bytearray:
return data
logging.error(f"Received unexpected status code {response.status_code}")
break
except requests.RequestException as e:
logging.exception(f"Exception occured during stream request")
break


class LimaCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)

self._lima_tango_device = self._connect(self._device_uri)
Expand All @@ -131,7 +227,7 @@ def _get_image(self) -> Tuple[bytearray, float, float, int]:

hfmt = ">IHHqiiHHHH"
hsize = struct.calcsize(hfmt)
_, _, img_mode, frame_number, width, height, _, _, _, _ = struct.unpack(
_, _, _, frame_number, width, height, _, _, _, _ = struct.unpack(
hfmt, img_data[1][:hsize]
)

Expand Down Expand Up @@ -162,7 +258,7 @@ def _poll_once(self) -> None:


class RedisCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, out_redis: str = None, out_redis_channel: str = None, in_redis_channel: str = 'frames'):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, out_redis: str = None, out_redis_channel: str = None, in_redis_channel: str = 'frames'):
super().__init__(device_uri, sleep_time, debug, out_redis, out_redis_channel)
# for this camera in_redis_... is for the input and redis_... as usual for output
self._in_redis_client = self._connect(self._device_uri)
Expand Down Expand Up @@ -203,17 +299,12 @@ def poll_image(self, output: Union[IO, multiprocessing.queues.Queue]) -> None:
"frame_number": self._last_frame_number
}
self._redis_client.publish(self._redis_channel, json.dumps(frame_dict))
raw_image_data = base64.b64decode(frame["data"])
# ffmpeg needs an rgb encoded image, since we cannot be sure if the image was in rgb or
# bgr(common for cv2 image manipulation) we need these transformations
image_array = np.frombuffer(raw_image_data, dtype=np.uint8)
frame = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

self._write_data(rgb_frame.tobytes())
raw_image_data = base64.b64decode(frame["data"])
self._write_data(self._image_to_rgb24(raw_image_data))

class TestCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)
self._sleep_time = 0.05
testimg_fpath = os.path.join(os.path.dirname(__file__), "fakeimg.jpg")
Expand All @@ -224,7 +315,7 @@ def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis:
self._last_frame_number = -1

def _poll_once(self) -> None:
self._write_data(self._raw_data)
self._write_data(bytearray(self._raw_data))

self._last_frame_number += 1
if self._redis:
Expand All @@ -239,7 +330,7 @@ def _poll_once(self) -> None:
time.sleep(self._sleep_time)

class VideoTestCamera(Camera):
def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis: str = None, redis_channel: str = None):
def __init__(self, device_uri: str, sleep_time: float, debug: bool = False, redis: str = None, redis_channel: str = None):
super().__init__(device_uri, sleep_time, debug, redis, redis_channel)
self._sleep_time = 0.04
# for your testvideo, please use an uncompressed video or mjpeg codec,
Expand All @@ -252,7 +343,7 @@ def __init__(self, device_uri: str, sleep_time: int, debug: bool = False, redis:

def _poll_once(self) -> None:
if not self._video_capture.isOpened():
print("Video capture is not opened.")
logging.error("Video capture is not opened.")
return

ret, frame = self._video_capture.read()
Expand All @@ -262,7 +353,7 @@ def _poll_once(self) -> None:
self._video_capture = cv2.VideoCapture(self._testvideo_fpath)
ret, frame = self._video_capture.read()
if not ret:
print("Failed to restart video capture.")
logging.error("Failed to restart video capture.")
return

frame_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
Expand All @@ -283,7 +374,7 @@ def _poll_once(self) -> None:

def _set_video_dimensions(self):
if not self._video_capture.isOpened():
print("Video capture is not opened.")
logging.error("Video capture is not opened.")
return
self._width = int(self._video_capture.get(cv2.CAP_PROP_FRAME_WIDTH))
self._height = int(self._video_capture.get(cv2.CAP_PROP_FRAME_HEIGHT))
43 changes: 41 additions & 2 deletions video_streamer/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,32 @@
from typing_extensions import Self

__all__ = (
"AuthenticationConfiguration",
"SourceConfiguration",
"ServerConfiguration",
"get_config_from_file",
"get_config_from_dict",
"get_auth_config_from_dict",
)

class AuthenticationConfiguration(BaseModel):
"""Authentication Configuration"""

type: Union[str, None] = Field(
title="Authentication Type",
description="Type of authentication, supported types are 'Basic', 'Digest', None",
default= None,
)

username: Union[bytes, str] = Field(
title="Username",
default="",
)

password: Union[bytes, str] = Field(
title="Password",
default="",
)

class SourceConfiguration(BaseModel):
"""Source Configuration"""
Expand Down Expand Up @@ -65,7 +85,10 @@ class SourceConfiguration(BaseModel):
description= "Channel for RedisCamera to listen to",
default="CameraStream",
)

auth_config: AuthenticationConfiguration = Field(
title="Authentication Configurations",
default=AuthenticationConfiguration(type=None),
)

class ServerConfiguration(BaseModel):
"""Server Configuration"""
Expand Down Expand Up @@ -130,7 +153,7 @@ def get_config_from_file(fpath: Union[str, Path]) -> Union[ServerConfiguration,


def get_config_from_dict(
config_data: dict[str, Any],
config_data: Dict[str, Any],
) -> Union[ServerConfiguration, None]:
"""Get server configuration from dictionary.

Expand All @@ -144,3 +167,19 @@ def get_config_from_dict(
return ServerConfiguration.model_validate(config_data)
except ValidationError:
return None

def get_auth_config_from_dict(
config_data: Dict[str, Any],
) -> Union[AuthenticationConfiguration, None]:
"""Get authentication configuration from dictionary.

Args:
config_data (dict[str, Any]): Authentication Data.

Returns:
Union[AuthenticationConfiguration, None]: Authentication Configuration or None.
"""
try:
return AuthenticationConfiguration.model_validate(config_data)
except ValidationError:
return None
Loading