Skip to content

Commit

Permalink
Add: MTL common handle support in GStreamer (#1047)
Browse files Browse the repository at this point in the history
MTL common handle support in GStreamer
- Add ability to use the same MTL instance for
  multiple GStreamer plugin instances in the same
  pipeline.
- Change the default behavior of the MTL GStreamer
  plugins to make all subsequent plugins ignore
  dev arguments after the first one.
  After this change only one MTL library process
  is spawned per pipeline.
- Add a new module called gst_common to hold
  the address of the MTL instance.
- Update documentation to include a warning about
  the behavior change.
  • Loading branch information
DawidWesierski4 authored Jan 29, 2025
1 parent 8fdce09 commit 6b056c8
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 40 deletions.
7 changes: 6 additions & 1 deletion ecosystem/gstreamer_plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,19 @@ In MTL GStreamer plugins there are general arguments that apply to every plugin.
| udp-port | uint | Receiving MTL node UDP port. | 0 to G_MAXUINT |
| tx-queues | uint | Number of TX queues to initialize in DPDK backend. | 0 to G_MAXUINT |
| rx-queues | uint | Number of RX queues to initialize in DPDK backend. | 0 to G_MAXUINT |
| payload-type | uint | SMPTE ST 2110 payload type. | 0 to G_MAXUINT |
| payload-type | uint | SMPTE ST 2110 payload type. | 0 to G_MAXUINT |

These are also general parameters accepted by plugins, but the functionality they provide to the user is not yet supported in plugins.
| Property Name | Type | Description | Range |
|---------------|--------|---------------------------------------------------------------------------------------------------|--------------------------|
| dma-dev | string | **RESERVED FOR FUTURE USE** port for the MTL direct memory functionality. | N/A |
| port | string | **RESERVED FOR FUTURE USE** DPDK device port. Utilized when multiple ports are passed to the MTL library to select the port for the session. | N/A |

> **Warning:**
> Generally, the `log-level`, `dev-port`, `dev-ip`, `tx-queues`, and `rx-queues` are used to initialize the MTLlibrary. As the MTL library handle is shared between MTL
> GStreamer plugins of the same pipeline, you only need to pass them once when specifying the arguments for the firstly initialized pipeline. Nothing happens when you specify them elsewhere;
> they will just be ignored after the initialization of MTL has already happened.

### 2.3. General capabilities

Some structures describe the capabilities generally
Expand Down
93 changes: 89 additions & 4 deletions ecosystem/gstreamer_plugin/gst_mtl_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@

#include "gst_mtl_common.h"

/* Shared handle for the MTL library used across plugins in the pipeline */
struct gst_common_handle {
mtl_handle mtl_handle;
int mtl_handle_reference_count;
pthread_mutex_t mutex;
};

static struct gst_common_handle common_handle = {0, 0, PTHREAD_MUTEX_INITIALIZER};
guint gst_mtl_port_idx = MTL_PORT_P;

gboolean gst_mtl_common_parse_input_finfo(const GstVideoFormatInfo* finfo,
Expand Down Expand Up @@ -393,19 +401,48 @@ gboolean gst_mtl_common_parse_dev_arguments(struct mtl_init_params* mtl_init_par
return ret;
}

mtl_handle gst_mtl_common_init_handle(struct mtl_init_params* p, StDevArgs* devArgs,
guint* log_level) {
/**
* Initializes the device with the given parameters.
*
* If the common handle (MTL instance already initialized in the pipeline)
* is already in use, the input parameters for the device
* (rx_queues, tx_queues, dev_ip, dev_port, and log_level) will be ignored.
* You can force to initialize another MTL instance to avoid this behavior with
* force_to_initialize_new_instance flag.
*
* @param force_to_initialize_new_instance Force the creation of a new MTL
* instance, ignoring any existing one.
* @param devArgs Initialization parameters for the DPDK port
* (ignored if using an existing MTL instance).
* @param log_level Log level for the library (ignored if using an
* existing MTL instance).
*/
mtl_handle gst_mtl_common_init_handle(StDevArgs* devArgs, guint* log_level,
gboolean force_to_initialize_new_instance) {
struct mtl_init_params mtl_init_params = {0};
mtl_handle ret;
pthread_mutex_lock(&common_handle.mutex);

if (!force_to_initialize_new_instance && common_handle.mtl_handle) {
GST_INFO("Mtl is already initialized with shared handle %p",
common_handle.mtl_handle);
common_handle.mtl_handle_reference_count++;

pthread_mutex_unlock(&common_handle.mutex);
return common_handle.mtl_handle;
}

if (!p || !devArgs || !log_level) {
if (!devArgs || !log_level) {
GST_ERROR("Invalid input");
pthread_mutex_unlock(&common_handle.mutex);
return NULL;
}

mtl_init_params.num_ports = 0;

if (gst_mtl_common_parse_dev_arguments(&mtl_init_params, devArgs) == FALSE) {
GST_ERROR("Failed to parse dev arguments");
pthread_mutex_unlock(&common_handle.mutex);
return NULL;
}
mtl_init_params.flags |= MTL_FLAG_BIND_NUMA;
Expand All @@ -422,5 +459,53 @@ mtl_handle gst_mtl_common_init_handle(struct mtl_init_params* p, StDevArgs* devA
}
*log_level = mtl_init_params.log_level;

return mtl_init(&mtl_init_params);
if (force_to_initialize_new_instance) {
GST_INFO("MTL shared handle ignored");

ret = mtl_init(&mtl_init_params);
pthread_mutex_unlock(&common_handle.mutex);

return ret;
}

common_handle.mtl_handle = mtl_init(&mtl_init_params);
common_handle.mtl_handle_reference_count++;
pthread_mutex_unlock(&common_handle.mutex);

return common_handle.mtl_handle;
}

/**
* Deinitializes the MTL handle.
* If the handle is the shared handle, the reference count is decremented.
* If the reference count reaches zero, the handle is deinitialized.
* If the handle is not the shared handle, it is deinitialized immediately.
*
* @param handle MTL handle to deinitialize (Null is an akceptable value then
* shared value will be used).
*/
gint gst_mtl_common_deinit_handle(mtl_handle handle) {
gint ret;

pthread_mutex_lock(&common_handle.mutex);

if (handle && handle != common_handle.mtl_handle) {
ret = mtl_uninit(handle);
pthread_mutex_unlock(&common_handle.mutex);
return ret;
}

common_handle.mtl_handle_reference_count--;

if (common_handle.mtl_handle_reference_count > 0) {
common_handle.mtl_handle_reference_count--;

pthread_mutex_unlock(&common_handle.mutex);
return 0;
}
ret = mtl_uninit(common_handle.mtl_handle);

pthread_mutex_unlock(&common_handle.mutex);
pthread_mutex_destroy(&common_handle.mutex);
return ret;
}
5 changes: 3 additions & 2 deletions ecosystem/gstreamer_plugin/gst_mtl_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ void gst_mtl_common_get_general_arguments(GObject* object, guint prop_id,
StDevArgs* devArgs, SessionPortArgs* portArgs,
guint* log_level);

mtl_handle gst_mtl_common_init_handle(struct mtl_init_params* p, StDevArgs* devArgs,
guint* log_level);
mtl_handle gst_mtl_common_init_handle(StDevArgs* devArgs, guint* log_level,
gboolean force_to_initialize_new_instance);

gint gst_mtl_common_deinit_handle(mtl_handle handle);
#endif /* __GST_MTL_COMMON_H__ */
6 changes: 3 additions & 3 deletions ecosystem/gstreamer_plugin/gst_mtl_st20p_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,6 @@ static void gst_mtl_st20p_rx_class_init(Gst_Mtl_St20p_RxClass* klass) {
}

static gboolean gst_mtl_st20p_rx_start(GstBaseSrc* basesrc) {
struct mtl_init_params mtl_init_params = {0};
struct st20p_rx_ops* ops_rx;
gint ret;

Expand All @@ -208,7 +207,7 @@ static gboolean gst_mtl_st20p_rx_start(GstBaseSrc* basesrc) {
GST_DEBUG("Media Transport Initialization start");

src->mtl_lib_handle =
gst_mtl_common_init_handle(&mtl_init_params, &(src->devArgs), &(src->log_level));
gst_mtl_common_init_handle(&(src->devArgs), &(src->log_level), FALSE);

if (!src->mtl_lib_handle) {
GST_ERROR("Could not initialize MTL");
Expand Down Expand Up @@ -510,7 +509,8 @@ static void gst_mtl_st20p_rx_finalize(GObject* object) {
}

if (src->mtl_lib_handle) {
if (mtl_stop(src->mtl_lib_handle) || mtl_uninit(src->mtl_lib_handle)) {
if (mtl_stop(src->mtl_lib_handle) ||
gst_mtl_common_deinit_handle(src->mtl_lib_handle)) {
GST_ERROR("Failed to uninitialize MTL library");
return;
}
Expand Down
7 changes: 3 additions & 4 deletions ecosystem/gstreamer_plugin/gst_mtl_st20p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -170,16 +170,14 @@ static void gst_mtl_st20p_tx_class_init(Gst_Mtl_St20p_TxClass* klass) {
}

static gboolean gst_mtl_st20p_tx_start(GstBaseSink* bsink) {
struct mtl_init_params mtl_init_params = {0};

Gst_Mtl_St20p_Tx* sink = GST_MTL_ST20P_TX(bsink);

GST_DEBUG_OBJECT(sink, "start");
GST_DEBUG("Media Transport Initialization start");
gst_base_sink_set_async_enabled(bsink, FALSE);

sink->mtl_lib_handle =
gst_mtl_common_init_handle(&mtl_init_params, &(sink->devArgs), &(sink->log_level));
gst_mtl_common_init_handle(&(sink->devArgs), &(sink->log_level), FALSE);

if (!sink->mtl_lib_handle) {
GST_ERROR("Could not initialize MTL");
Expand Down Expand Up @@ -459,7 +457,8 @@ static void gst_mtl_st20p_tx_finalize(GObject* object) {
}

if (sink->mtl_lib_handle) {
if (mtl_stop(sink->mtl_lib_handle) || mtl_uninit(sink->mtl_lib_handle)) {
if (mtl_stop(sink->mtl_lib_handle) ||
gst_mtl_common_deinit_handle(sink->mtl_lib_handle)) {
GST_ERROR("Failed to uninitialize MTL library");
return;
}
Expand Down
6 changes: 3 additions & 3 deletions ecosystem/gstreamer_plugin/gst_mtl_st30p_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ static void gst_mtl_st30p_rx_class_init(Gst_Mtl_St30p_RxClass* klass) {
}

static gboolean gst_mtl_st30p_rx_start(GstBaseSrc* basesrc) {
struct mtl_init_params mtl_init_params = {0};
struct st30p_rx_ops* ops_rx;
gint ret;

Expand All @@ -194,7 +193,7 @@ static gboolean gst_mtl_st30p_rx_start(GstBaseSrc* basesrc) {
GST_DEBUG("Media Transport Initialization start");

src->mtl_lib_handle =
gst_mtl_common_init_handle(&mtl_init_params, &(src->devArgs), &(src->log_level));
gst_mtl_common_init_handle(&(src->devArgs), &(src->log_level), FALSE);

if (!src->mtl_lib_handle) {
GST_ERROR("Could not initialize MTL");
Expand Down Expand Up @@ -469,7 +468,8 @@ static void gst_mtl_st30p_rx_finalize(GObject* object) {
}

if (src->mtl_lib_handle) {
if (mtl_stop(src->mtl_lib_handle) || mtl_uninit(src->mtl_lib_handle)) {
if (mtl_stop(src->mtl_lib_handle) ||
gst_mtl_common_deinit_handle(src->mtl_lib_handle)) {
GST_ERROR("Failed to uninitialize MTL library");
return;
}
Expand Down
7 changes: 3 additions & 4 deletions ecosystem/gstreamer_plugin/gst_mtl_st30p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -181,16 +181,14 @@ static void gst_mtl_st30p_tx_class_init(Gst_Mtl_St30p_TxClass* klass) {
}

static gboolean gst_mtl_st30p_tx_start(GstBaseSink* bsink) {
struct mtl_init_params mtl_init_params = {0};

Gst_Mtl_St30p_Tx* sink = GST_MTL_ST30P_TX(bsink);

GST_DEBUG_OBJECT(sink, "start");
GST_DEBUG("Media Transport Initialization start");
gst_base_sink_set_async_enabled(bsink, FALSE);

sink->mtl_lib_handle =
gst_mtl_common_init_handle(&mtl_init_params, &(sink->devArgs), &(sink->log_level));
gst_mtl_common_init_handle(&(sink->devArgs), &(sink->log_level), FALSE);

if (!sink->mtl_lib_handle) {
GST_ERROR("Could not initialize MTL");
Expand Down Expand Up @@ -499,7 +497,8 @@ static void gst_mtl_st30p_tx_finalize(GObject* object) {
}

if (sink->mtl_lib_handle) {
if (mtl_stop(sink->mtl_lib_handle) || mtl_uninit(sink->mtl_lib_handle)) {
if (mtl_stop(sink->mtl_lib_handle) ||
gst_mtl_common_deinit_handle(sink->mtl_lib_handle)) {
GST_ERROR("Failed to uninitialize MTL library");
return;
}
Expand Down
6 changes: 3 additions & 3 deletions ecosystem/gstreamer_plugin/gst_mtl_st40_rx.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ static void gst_mtl_st40_rx_class_init(Gst_Mtl_St40_RxClass* klass) {
}

static gboolean gst_mtl_st40_rx_start(GstBaseSrc* basesrc) {
struct mtl_init_params mtl_init_params = {0};
struct st40_rx_ops ops_rx = {0};
gint ret;

Expand All @@ -192,7 +191,7 @@ static gboolean gst_mtl_st40_rx_start(GstBaseSrc* basesrc) {
GST_DEBUG("Media Transport Initialization start");

src->mtl_lib_handle =
gst_mtl_common_init_handle(&mtl_init_params, &(src->devArgs), &(src->log_level));
gst_mtl_common_init_handle(&(src->devArgs), &(src->log_level), FALSE);

if (!src->mtl_lib_handle) {
GST_ERROR("Could not initialize MTL");
Expand Down Expand Up @@ -464,7 +463,8 @@ static void gst_mtl_st40_rx_finalize(GObject* object) {
pthread_cond_destroy(&src->mbuff_cond);

if (src->mtl_lib_handle) {
if (mtl_stop(src->mtl_lib_handle) || mtl_uninit(src->mtl_lib_handle)) {
if (mtl_stop(src->mtl_lib_handle) ||
gst_mtl_common_deinit_handle(src->mtl_lib_handle)) {
GST_ERROR("Failed to uninitialize MTL library");
}
}
Expand Down
7 changes: 3 additions & 4 deletions ecosystem/gstreamer_plugin/gst_mtl_st40p_tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,14 @@ static void gst_mtl_st40p_tx_class_init(Gst_Mtl_St40p_TxClass* klass) {
}

static gboolean gst_mtl_st40p_tx_start(GstBaseSink* bsink) {
struct mtl_init_params mtl_init_params = {0};

Gst_Mtl_St40p_Tx* sink = GST_MTL_ST40P_TX(bsink);

GST_DEBUG_OBJECT(sink, "start");
GST_DEBUG("Media Transport Initialization start");
gst_base_sink_set_async_enabled(bsink, FALSE);

sink->mtl_lib_handle =
gst_mtl_common_init_handle(&mtl_init_params, &(sink->devArgs), &(sink->log_level));
gst_mtl_common_init_handle(&(sink->devArgs), &(sink->log_level), FALSE);

if (!sink->mtl_lib_handle) {
GST_ERROR("Could not initialize MTL");
Expand Down Expand Up @@ -483,7 +481,8 @@ static void gst_mtl_st40p_tx_finalize(GObject* object) {
}

if (sink->mtl_lib_handle) {
if (mtl_stop(sink->mtl_lib_handle) || mtl_uninit(sink->mtl_lib_handle)) {
if (mtl_stop(sink->mtl_lib_handle) ||
gst_mtl_common_deinit_handle(sink->mtl_lib_handle)) {
GST_ERROR("Failed to uninitialize MTL library");
return;
}
Expand Down
Loading

0 comments on commit 6b056c8

Please sign in to comment.