-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathhailo_common_funcs.py
86 lines (72 loc) · 3.49 KB
/
hailo_common_funcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import numpy as np
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
# ---------------------------------------------------------
# Functions used to get numpy arrays from GStreamer buffers
# ---------------------------------------------------------
def handle_rgb(map_info, width, height):
# The copy() method is used to create a copy of the numpy array. This is necessary because the original numpy array is created from buffer data, and it does not own the data it represents. Instead, it's just a view of the buffer's data.
return np.ndarray(shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data).copy()
def handle_nv12(map_info, width, height):
y_plane_size = width * height
uv_plane_size = width * height // 2
y_plane = np.ndarray(shape=(height, width), dtype=np.uint8, buffer=map_info.data[:y_plane_size]).copy()
uv_plane = np.ndarray(shape=(height//2, width//2, 2), dtype=np.uint8, buffer=map_info.data[y_plane_size:]).copy()
return y_plane, uv_plane
def handle_yuyv(map_info, width, height):
return np.ndarray(shape=(height, width, 2), dtype=np.uint8, buffer=map_info.data).copy()
FORMAT_HANDLERS = {
'RGB': handle_rgb,
'NV12': handle_nv12,
'YUYV': handle_yuyv,
}
def get_numpy_from_buffer(buffer, format, width, height):
"""
Converts a GstBuffer to a numpy array based on provided format, width, and height.
Args:
buffer (GstBuffer): The GStreamer Buffer to convert.
format (str): The video format ('RGB', 'NV12', 'YUYV', etc.).
width (int): The width of the video frame.
height (int): The height of the video frame.
Returns:
np.ndarray: A numpy array representing the buffer's data, or a tuple of arrays for certain formats.
"""
# Map the buffer to access data
success, map_info = buffer.map(Gst.MapFlags.READ)
if not success:
raise ValueError("Buffer mapping failed")
try:
# Handle different formats based on the provided format parameter
handler = FORMAT_HANDLERS.get(format)
if handler is None:
raise ValueError(f"Unsupported format: {format}")
return handler(map_info, width, height)
finally:
buffer.unmap(map_info)
# ---------------------------------------------------------
# Useful functions for working with GStreamer
# ---------------------------------------------------------
def disable_qos(pipeline):
"""
Iterate through all elements in the given GStreamer pipeline and set the qos property to False
where applicable.
When the 'qos' property is set to True, the element will measure the time it takes to process each buffer and will drop frames if it latency is too high.
We are running on long pipelines, so we want to disable this feature to avoid dropping frames.
:param pipeline: A GStreamer pipeline object
"""
# Ensure the pipeline is a Gst.Pipeline instance
if not isinstance(pipeline, Gst.Pipeline):
print("The provided object is not a GStreamer Pipeline")
return
# Iterate through all elements in the pipeline
it = pipeline.iterate_elements()
while True:
result, element = it.next()
if result != Gst.IteratorResult.OK:
break
# Check if the element has the 'qos' property
if 'qos' in GObject.list_properties(element):
# Set the 'qos' property to False
element.set_property('qos', False)
print(f"Set qos to False for {element.get_name()}")