Skip to content

Commit

Permalink
Apply isort and linter black
Browse files Browse the repository at this point in the history
  • Loading branch information
Falron98 committed Jan 13, 2025
1 parent 534b5e2 commit 1c5e402
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 121 deletions.
184 changes: 107 additions & 77 deletions tests/validation/tests/Engine/GstreamerApp.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,41 @@
import hashlib
import os

from tests.Engine.execute import call, wait, log_fail, log_info
from tests.Engine.execute import call, log_fail, log_info, wait


def create_connection_params(
dev_port: str,
payload_type: str,
dev_ip: str = "192.168.96.3",
ip: str = "192.168.96.2",
udp_port: int = 20000,
is_tx: bool = True
is_tx: bool = True,
) -> dict:
params = {
"dev-port": dev_port,
}
if is_tx:
params.update({
"dev-ip": ip,
"tx-ip": dev_ip,
"tx-udp-port": udp_port,
"tx-payload-type": payload_type,
})
params.update(
{
"dev-ip": ip,
"tx-ip": dev_ip,
"tx-udp-port": udp_port,
"tx-payload-type": payload_type,
}
)
else:
params.update({
"dev-ip": dev_ip,
"rx-ip": ip,
"rx-udp-port": udp_port,
"rx-payload-type": payload_type
})
params.update(
{
"dev-ip": dev_ip,
"rx-ip": ip,
"rx-udp-port": udp_port,
"rx-payload-type": payload_type,
}
)
return params


def setup_gstreamer_st20p_tx_pipeline(
build: str,
nic_port_list: str,
Expand All @@ -41,22 +47,24 @@ def setup_gstreamer_st20p_tx_pipeline(
tx_payload_type: int,
tx_queues: int,
tx_framebuff_num: int = None,
tx_fps: int = None
):
connection_params = create_connection_params(dev_port=nic_port_list, payload_type=tx_payload_type, is_tx=True)
tx_fps: int = None,
):
connection_params = create_connection_params(
dev_port=nic_port_list, payload_type=tx_payload_type, is_tx=True
)

# st20 tx GStreamer command line
pipeline_command = [
"gst-launch-1.0",
"gst-launch-1.0",
"-v",
"filesrc",
f"location={input_path}",
"filesrc",
f"location={input_path}",
"blocksize=5529600",
"!",
"!",
f"video/x-raw,format={format},height={height},width={width},framerate={framerate}/1",
"!",
"!",
"mtl_st20p_tx",
f"tx-queues={tx_queues}"
f"tx-queues={tx_queues}",
]

if tx_framebuff_num is not None:
Expand All @@ -68,10 +76,13 @@ def setup_gstreamer_st20p_tx_pipeline(
if tx_fps is not None:
pipeline_command.append(f"tx-fps={tx_fps}")

pipeline_command.append(f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/")
pipeline_command.append(
f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/"
)

return pipeline_command


def setup_gstreamer_st20p_rx_pipeline(
build: str,
nic_port_list: str,
Expand All @@ -83,20 +94,22 @@ def setup_gstreamer_st20p_rx_pipeline(
rx_payload_type: int,
rx_queues: int,
rx_framebuff_num: int = None,
rx_fps: int = None
):
connection_params = create_connection_params(dev_port=nic_port_list, payload_type=rx_payload_type, is_tx=False)
rx_fps: int = None,
):
connection_params = create_connection_params(
dev_port=nic_port_list, payload_type=rx_payload_type, is_tx=False
)

# st20 rx GStreamer command line
pipeline_command = [
"gst-launch-1.0",
"-v",
"gst-launch-1.0",
"-v",
"mtl_st20p_rx",
f"rx-queues={rx_queues}",
f"rx-pixel-format={format}",
f"rx-height={height}",
f"rx-width={width}",
f"rx-fps={framerate}"
f"rx-fps={framerate}",
]

for key, value in connection_params.items():
Expand All @@ -107,10 +120,13 @@ def setup_gstreamer_st20p_rx_pipeline(

pipeline_command.extend(["!", "filesink", f"location={output_path}"])

pipeline_command.append(f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/")
pipeline_command.append(
f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/"
)

return pipeline_command


def setup_gstreamer_st30_tx_pipeline(
build: str,
nic_port_list: str,
Expand All @@ -119,34 +135,39 @@ def setup_gstreamer_st30_tx_pipeline(
tx_queues: int,
audio_format: str,
channels: int,
sampling: int
):
connection_params = create_connection_params(dev_port=nic_port_list, payload_type=tx_payload_type, udp_port=30000, is_tx=True)
sampling: int,
):
connection_params = create_connection_params(
dev_port=nic_port_list, payload_type=tx_payload_type, udp_port=30000, is_tx=True
)

# st30 tx GStreamer command line
pipeline_command = [
"gst-launch-1.0",
"filesrc",
f"location={input_path}",
"gst-launch-1.0",
"filesrc",
f"location={input_path}",
"blocksize=70000",
"!",
"!",
"rawaudioparse",
f"format=pcm",
"format=pcm",
f"sample-rate={sampling}",
f"pcm-format={audio_format}",
f"num-channels={channels}",
"!",
"!",
"mtl_st30p_tx",
f"tx-queues={tx_queues}"
f"tx-queues={tx_queues}",
]

for key, value in connection_params.items():
pipeline_command.append(f"{key}={value}")

pipeline_command.append(f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/")
pipeline_command.append(
f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/"
)

return pipeline_command


def setup_gstreamer_st30_rx_pipeline(
build: str,
nic_port_list: str,
Expand All @@ -155,53 +176,57 @@ def setup_gstreamer_st30_rx_pipeline(
rx_queues: int,
rx_audio_format: str,
rx_channels: int,
rx_sampling: int
):
connection_params = create_connection_params(dev_port=nic_port_list, payload_type=rx_payload_type, udp_port=30000, is_tx=False)
rx_sampling: int,
):
connection_params = create_connection_params(
dev_port=nic_port_list,
payload_type=rx_payload_type,
udp_port=30000,
is_tx=False,
)

# st30 rx GStreamer command line
pipeline_command = [
"gst-launch-1.0",
"-v",
"mtl_st30p_rx"
]
pipeline_command = ["gst-launch-1.0", "-v", "mtl_st30p_rx"]

for key, value in connection_params.items():
pipeline_command.append(f"{key}={value}")

for x in [
f"rx-queues={rx_queues}",
f"rx-audio-format={rx_audio_format}",
f"rx-channel={rx_channels}",
f"rx-sampling={rx_sampling}",
"!",
"filesink",
f"location={output_path}"
]:
"!",
"filesink",
f"location={output_path}",
]:
pipeline_command.append(x)

pipeline_command.append(f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/")
pipeline_command.append(
f"--gst-plugin-path={build}/ecosystem/gstreamer_plugin/builddir/"
)

return pipeline_command


def execute_test(
build: str,
tx_command: list,
rx_command: list,
input_file: str,
output_file: str,
type: str,
fps: int = None
):

tx_process = call(' '.join(tx_command), cwd=build, timeout=120)
rx_process = call(' '.join(rx_command), cwd=build, timeout=120)
build: str,
tx_command: list,
rx_command: list,
input_file: str,
output_file: str,
type: str,
fps: int = None,
):

tx_process = call(" ".join(tx_command), cwd=build, timeout=120)
rx_process = call(" ".join(rx_command), cwd=build, timeout=120)

tx_output = wait(tx_process)
rx_output = wait(rx_process)
wait(rx_process)
if type == "st20":
tx_result = check_tx_output(fps=fps, output=tx_output.splitlines())
#rx_result = check_rx_output(fps=fps, output=rx_output.splitlines())
# rx_result = check_rx_output(fps=fps, output=rx_output.splitlines())
if tx_result is False:
return False

Expand All @@ -211,37 +236,40 @@ def execute_test(

if file_compare:
return True

return False


def check_tx_output(fps: int, output: list) -> bool:
tx_fps_result = None
for line in output:
if "TX_VIDEO_SESSION(0,0:st20sink): fps" in line:
tx_fps_result = line
if tx_fps_result is not None:
for x in range(fps, fps-3, -1):
for x in range(fps, fps - 3, -1):
if f"fps {x}" in tx_fps_result:
log_info(f"FPS > {x}")
return True

log_fail("tx session failed")
return False


def check_rx_output(fps: int, output: list) -> bool:
rx_fps_result = None
for line in output:
if "RX_VIDEO_SESSION(0,0:st20src): fps" in line:
rx_fps_result = line
if rx_fps_result is not None:
for x in range(fps, fps-2, -1):
for x in range(fps, fps - 2, -1):
if f"fps {x}" in line:
log_info(f"FPS > {x}")
return True

log_fail("rx session failed")
return False


def compare_files(input_file, output_file):
if os.path.exists(input_file) and os.path.exists(output_file):
input_file_size = os.path.getsize(input_file)
Expand All @@ -252,7 +280,7 @@ def compare_files(input_file, output_file):
log_fail("File size is different")
return False

with open(input_file, 'rb') as i_file, open(output_file, 'rb') as o_file:
with open(input_file, "rb") as i_file, open(output_file, "rb") as o_file:
i_hash = hashlib.md5(i_file.read()).hexdigest()
o_hash = hashlib.md5(o_file.read()).hexdigest()
log_info(f"Input file hash: {i_hash}")
Expand All @@ -263,24 +291,26 @@ def compare_files(input_file, output_file):
log_fail("Comparison of files failed")
return False


def video_format_change(file_format):
if file_format in ["YUV422PLANAR10LE", "YUV_422_10bit"]:
return "I422_10LE"
else:
return file_format



def audio_format_change(file_format, rx_side: bool = False):
if rx_side == True:
if rx_side:
if file_format == "s8":
return "PCM8"
elif file_format == "s16le":
return "PCM16"
else:
return "PCM24"
else:
else:
if file_format == "s8":
return 8
elif file_format == "s16le":
return 16
else:
return 24
return 24
Loading

0 comments on commit 1c5e402

Please sign in to comment.