diff --git a/fluster/decoders/ffmpeg.py b/fluster/decoders/ffmpeg.py index 39fc67c3..eb38d9b9 100644 --- a/fluster/decoders/ffmpeg.py +++ b/fluster/decoders/ffmpeg.py @@ -16,98 +16,51 @@ # You should have received a copy of the GNU Lesser General Public # License along with this library. If not, see . -import os from functools import lru_cache -from typing import List, Optional, Match -import shlex +from typing import Dict, Optional, Tuple import subprocess import re from fluster.codec import Codec, OutputFormat from fluster.decoder import Decoder, register_decoder -from fluster.utils import file_checksum, run_command - -FFMPEG_TPL = "{} -nostdin -i {} {} -vf {}format=pix_fmts={} -f rawvideo {}" - - -def output_format_to_ffformat(output_format: OutputFormat) -> str: - """Return GStreamer pixel format""" - mapping = { - OutputFormat.YUV420P: "nv12", - OutputFormat.YUV422P: "nv12", # vulkan - OutputFormat.YUV420P10LE: "p010", - OutputFormat.YUV422P10LE: "p012", - } - if output_format not in mapping: - raise Exception( - f"No matching output format found in FFmpeg for {output_format}" +from fluster.utils import file_checksum, run_command, run_command_with_output + + +@lru_cache(maxsize=128) +def _run_ffmpeg_command( + binary: str, + *args: str, + verbose: bool = False, +) -> str: + """Runs a ffmpeg command and returns the output or an empty string""" + try: + return run_command_with_output( + [binary, "-hide_banner", *args], + verbose=verbose, ) - return mapping[output_format] + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): + return "" class FFmpegDecoder(Decoder): """Generic class for FFmpeg decoder""" binary = "ffmpeg" - description = "" - cmd = "" api = "" wrapper = False hw_download = False - init_device = "" + hw_download_mapping: Dict[OutputFormat, str] = {} + init_hw_device = "" + hw_output_format = "" + thread_count = 1 def __init__(self) -> None: super().__init__() - self.cmd = self.binary - if self.hw_acceleration: - if self.init_device: - self.cmd += f' -init_hw_device "{self.init_device}" -hwaccel_output_format {self.api.lower()}' - if self.wrapper: - self.cmd += f" -c:v {self.api.lower()}" - else: - self.cmd += f" -hwaccel {self.api.lower()}" self.name = f'FFmpeg-{self.codec.value}{"-" + self.api if self.api else ""}' self.description = f'FFmpeg {self.codec.value} {self.api if self.hw_acceleration else "SW"} decoder' - - @lru_cache(maxsize=128) - def ffmpeg_version(self) -> Optional[Match[str]]: - """Returns the ffmpeg version as a re.Match object""" - cmd = shlex.split("ffmpeg -version") - output = subprocess.check_output(cmd, stderr=subprocess.DEVNULL).decode("utf-8") - version = re.search(r"\d\.\d\.\d", output) - return version - - def ffmpeg_cmd( - self, input_filepath: str, output_filepath: str, output_format: OutputFormat - ) -> List[str]: - """Returns the formatted ffmpeg command based on the current ffmpeg version""" - version = self.ffmpeg_version() - download = "" - if self.hw_acceleration and self.hw_download: - download = f"hwdownload,format={output_format_to_ffformat(output_format)}," - if version and int(version.group(0)[0]) >= 5 and int(version.group(0)[2]) >= 1: - cmd = shlex.split( - FFMPEG_TPL.format( - self.cmd, - input_filepath, - "-fps_mode passthrough", - download, - str(output_format.value), - output_filepath, - ) - ) - else: - cmd = shlex.split( - FFMPEG_TPL.format( - self.cmd, - input_filepath, - "-vsync passthrough", - download, - str(output_format.value), - output_filepath, - ) - ) - return cmd + self.ffmpeg_codec: Optional[str] = None + self.ffmpeg_version: Optional[Tuple[int, ...]] = None + self.use_md5_muxer: bool = False def decode( self, @@ -119,38 +72,109 @@ def decode( keep_files: bool, ) -> str: """Decodes input_filepath in output_filepath""" - # pylint: disable=unused-argument - cmd = self.ffmpeg_cmd(input_filepath, output_filepath, output_format) - run_command(cmd, timeout=timeout, verbose=verbose) + # pylint: disable=too-many-branches + command = [self.binary, "-hide_banner", "-nostdin"] + + # Loglevel + if not verbose: + command.extend(["-loglevel", "warning"]) + + # Hardware acceleration + if self.hw_acceleration: + if self.init_hw_device: + command.extend(["-init_hw_device", self.init_hw_device]) + if not self.wrapper: + command.extend(["-hwaccel", self.api.lower()]) + if self.hw_output_format: + command.extend(["-hwaccel_output_format", self.hw_output_format]) + + # Number of threads + if self.thread_count: + command.extend(["-threads", str(self.thread_count)]) + + # Codec + if self.hw_acceleration and self.wrapper: + command.extend(["-codec", self.api.lower()]) + elif self.ffmpeg_codec: + command.extend(["-codec", self.ffmpeg_codec]) + + # Input file + command.extend(["-i", input_filepath]) + + # Passthrough timestamp from the demuxer to the muxer + if self.ffmpeg_version and self.ffmpeg_version < (5, 1): + command.extend(["-vsync", "passthrough"]) + else: + command.extend(["-fps_mode", "passthrough"]) + + # Hardware download + download = "" + if self.hw_acceleration and self.hw_download: + if output_format not in self.hw_download_mapping: + raise Exception( + f"No matching ffmpeg pixel format found for {output_format}" + ) + download = f"hwdownload,format={self.hw_download_mapping[output_format]}," + + # Output format filter + command.extend(["-filter", f"{download}format=pix_fmts={output_format.value}"]) + + # MD5 muxer + if self.use_md5_muxer and not keep_files: + command.extend(["-f", "md5", "-"]) + output = run_command_with_output(command, timeout=timeout, verbose=verbose) + md5sum = re.search(r"MD5=([0-9a-fA-F]+)\s*", output) + if not md5sum: + raise Exception("No MD5 found in the program trace.") + return md5sum.group(1).lower() + + # Output file + command.extend(["-f", "rawvideo", output_filepath]) + run_command(command, timeout=timeout, verbose=verbose) return file_checksum(output_filepath) @lru_cache(maxsize=128) def check(self, verbose: bool) -> bool: """Checks whether the decoder can be run""" - # pylint: disable=broad-except - if self.hw_acceleration: - try: - command = None - - if self.wrapper: - command = [self.binary, "-decoders"] - else: - command = [self.binary, "-hwaccels"] - - output = subprocess.check_output( - command, stderr=subprocess.DEVNULL - ).decode("utf-8") - if verbose: - print(f'{" ".join(command)}\n{output}') - - if self.wrapper: - return self.api.lower() in output - - return f"{os.linesep}{self.api.lower()}{os.linesep}" in output - except Exception: - return False - else: - return super().check(verbose) + if not super().check(verbose): + return False + + # Check if codec is supported + codec_mapping = { + Codec.H264: "h264", + Codec.H265: "hevc", + Codec.VP8: "vp8", + Codec.VP9: "vp9", + Codec.AV1: "av1", + } + if self.codec not in codec_mapping: + return False + self.ffmpeg_codec = codec_mapping[self.codec] + + # Get ffmpeg version + output = _run_ffmpeg_command(self.binary, "-version", verbose=verbose) + version = re.search(r" version n?(\d+)\.(\d+)(?:\.(\d+))?", output) + self.ffmpeg_version = tuple(map(int, version.groups())) if version else None + + # Check if codec can be used + output = _run_ffmpeg_command(self.binary, "-codecs", verbose=verbose) + codec = re.escape(self.ffmpeg_codec) + if re.search(rf"\s+{codec}\s+", output) is None: + return False + + # Check if MD5 muxer can be used + output = _run_ffmpeg_command(self.binary, "-formats", verbose=verbose) + muxer = re.escape("md5") + self.use_md5_muxer = re.search(rf"E\s+{muxer}\s+", output) is not None + + if not self.hw_acceleration: + return True + + # Check if hw decoder or hwaccel is supported + command = "-decoders" if self.wrapper else "-hwaccels" + output = _run_ffmpeg_command(self.binary, command, verbose=verbose) + api = re.escape(self.api.lower()) + return re.search(rf"\s+{api}\s+", output) is not None @register_decoder @@ -244,6 +268,13 @@ class FFmpegH265VdpauDecoder(FFmpegVdpauDecoder): codec = Codec.H265 +@register_decoder +class FFmpegVP9VdpauDecoder(FFmpegVdpauDecoder): + """FFmpeg VDPAU decoder for VP9""" + + codec = Codec.VP9 + + @register_decoder class FFmpegAV1VdpauDecoder(FFmpegVdpauDecoder): """FFmpeg VDPAU decoder for AV1""" @@ -293,34 +324,48 @@ class FFmpegH265D3d11vaDecoder(FFmpegD3d11vaDecoder): codec = Codec.H265 +class FFmpegV4L2m2mDecoder(FFmpegDecoder): + """Generic class for FFmpeg V4L2 mem2mem decoder""" + + hw_acceleration = True + wrapper = True + + def __init__(self) -> None: + super().__init__() + self.name = f"FFmpeg-{self.codec.value}-v4l2m2m" + self.description = f"FFmpeg {self.codec.value} v4l2m2m decoder" + + @register_decoder -class FFmpegVP8V4L2m2mDecoder(FFmpegDecoder): - """FFmpeg V4L2m2m decoder for VP8""" +class FFmpegVP8V4L2m2mDecoder(FFmpegV4L2m2mDecoder): + """FFmpeg V4L2 mem2mem decoder for VP8""" codec = Codec.VP8 - hw_acceleration = True api = "vp8_v4l2m2m" - wrapper = True @register_decoder -class FFmpegVP9V4L2m2mDecoder(FFmpegDecoder): - """FFmpeg V4L2m2m decoder for VP9""" +class FFmpegVP9V4L2m2mDecoder(FFmpegV4L2m2mDecoder): + """FFmpeg V4L2 mem2mem decoder for VP9""" codec = Codec.VP9 - hw_acceleration = True api = "vp9_v4l2m2m" - wrapper = True @register_decoder -class FFmpegH264V4L2m2mDecoder(FFmpegDecoder): - """FFmpeg V4L2m2m decoder for H264""" +class FFmpegH264V4L2m2mDecoder(FFmpegV4L2m2mDecoder): + """FFmpeg V4L2 mem2mem decoder for H.264""" codec = Codec.H264 - hw_acceleration = True api = "h264_v4l2m2m" - wrapper = True + + +@register_decoder +class FFmpegH265V4L2m2mDecoder(FFmpegV4L2m2mDecoder): + """FFmpeg V4L2 mem2mem decoder for H.265""" + + codec = Codec.H265 + api = "hevc_v4l2m2m" class FFmpegVulkanDecoder(FFmpegDecoder): @@ -328,8 +373,15 @@ class FFmpegVulkanDecoder(FFmpegDecoder): hw_acceleration = True api = "Vulkan" - init_device = "vulkan" + init_hw_device = "vulkan" + hw_output_format = "vulkan" hw_download = True + hw_download_mapping = { + OutputFormat.YUV420P: "nv12", + OutputFormat.YUV422P: "nv12", + OutputFormat.YUV420P10LE: "p010", + OutputFormat.YUV422P10LE: "p012", + } @register_decoder @@ -351,3 +403,55 @@ class FFmpegAV1VulkanDecoder(FFmpegVulkanDecoder): """FFmpeg Vulkan decoder for AV1""" codec = Codec.AV1 + + +class FFmpegCudaDecoder(FFmpegDecoder): + """Generic class for FFmpeg CUDA decoder""" + + hw_acceleration = True + api = "CUDA" + hw_output_format = "cuda" + hw_download = True + hw_download_mapping = { + OutputFormat.YUV420P: "nv12", + OutputFormat.YUV444P: "yuv444p", + OutputFormat.YUV420P10LE: "p010", + OutputFormat.YUV444P10LE: "yuv444p16le", + OutputFormat.YUV420P12LE: "p016", + OutputFormat.YUV444P12LE: "yuv444p16le", + } + + +@register_decoder +class FFmpegH264CudaDecoder(FFmpegCudaDecoder): + """FFmpeg CUDA decoder for H.264""" + + codec = Codec.H264 + + +@register_decoder +class FFmpegH265CudaDecoder(FFmpegCudaDecoder): + """FFmpeg CUDA decoder for H.265""" + + codec = Codec.H265 + + +@register_decoder +class FFmpegVP8CudaDecoder(FFmpegCudaDecoder): + """FFmpeg CUDA decoder for VP8""" + + codec = Codec.VP8 + + +@register_decoder +class FFmpegVP9CudaDecoder(FFmpegCudaDecoder): + """FFmpeg CUDA decoder for VP9""" + + codec = Codec.VP9 + + +@register_decoder +class FFmpegAV1VCudaDecoder(FFmpegCudaDecoder): + """FFmpeg CUDA decoder for AV1""" + + codec = Codec.AV1 diff --git a/fluster/decoders/gstreamer.py b/fluster/decoders/gstreamer.py index 7e2e7dc1..153cc738 100644 --- a/fluster/decoders/gstreamer.py +++ b/fluster/decoders/gstreamer.py @@ -28,7 +28,7 @@ from fluster.utils import ( file_checksum, run_command, - run_pipe_command_with_std_output, + run_command_with_output, normalize_binary_cmd, ) @@ -111,13 +111,10 @@ def gen_pipeline( self.cmd, input_filepath, self.decoder_bin, self.caps, self.sink, output ) - def parse_videocodectestsink_md5sum(self, data: List[str], verbose: bool) -> str: + def parse_videocodectestsink_md5sum(self, data: List[str]) -> str: """Parse the MD5 sum out of commandline output produced when using videocodectestsink.""" - md5sum = None for line in data: - if verbose: - print(line) pattern = ( "conformance/checksum, checksum-type=(string)MD5, checksum=(string)" ) @@ -135,14 +132,9 @@ def parse_videocodectestsink_md5sum(self, data: List[str], verbose: bool) -> str continue else: sum_end += sum_start - md5sum = line[sum_start:sum_end] - if not verbose: - return md5sum + return line[sum_start:sum_end] - if not md5sum: - raise Exception("No MD5 found in the program trace.") - - return md5sum + raise Exception("No MD5 found in the program trace.") def decode( self, @@ -162,10 +154,10 @@ def decode( pipeline = self.gen_pipeline(input_filepath, output_param, output_format) command = shlex.split(pipeline) command.append("-m") - data = run_pipe_command_with_std_output( + data = run_command_with_output( command, timeout=timeout, verbose=verbose - ) - return self.parse_videocodectestsink_md5sum(data, verbose) + ).splitlines() + return self.parse_videocodectestsink_md5sum(data) pipeline = self.gen_pipeline(input_filepath, output_filepath, output_format) run_command(shlex.split(pipeline), timeout=timeout, verbose=verbose) diff --git a/fluster/utils.py b/fluster/utils.py index 0277d80b..ff7cb25b 100644 --- a/fluster/utils.py +++ b/fluster/utils.py @@ -69,36 +69,34 @@ def run_command( raise ex -def run_pipe_command_with_std_output( +def run_command_with_output( command: List[str], verbose: bool = False, check: bool = True, timeout: Optional[int] = None, -) -> List[str]: +) -> str: """Runs a command and returns std output trace""" serr = subprocess.DEVNULL if not verbose else subprocess.STDOUT if verbose: print(f'\nRunning command "{" ".join(command)}"') try: - data = subprocess.check_output( + output = subprocess.check_output( command, stderr=serr, timeout=timeout, universal_newlines=True ) - return data.splitlines() + if verbose and output: + print(output) + return output or "" except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as ex: - odata: List[str] = [] - if verbose or check: + if verbose and ex.output: # Workaround inconsistent Python implementation - if isinstance(ex, subprocess.CalledProcessError): - odata = ex.output.splitlines() + if isinstance(ex, subprocess.TimeoutExpired): + print(ex.output.decode("utf-8")) else: - odata = ex.output.decode("utf-8").splitlines() - if verbose: - for line in odata: - print(line) + print(ex.output) if isinstance(ex, subprocess.CalledProcessError) and not check: - return odata + return ex.output or "" # Developer experience improvement (facilitates copy/paste) ex.cmd = " ".join(ex.cmd) diff --git a/scripts/gen_jct_vc.py b/scripts/gen_jct_vc.py index 2b86b544..02837e0a 100755 --- a/scripts/gen_jct_vc.py +++ b/scripts/gen_jct_vc.py @@ -160,7 +160,7 @@ def generate(self, download, jobs): 'default=nokey=1:noprint_wrappers=1', absolute_input_path] - result = utils.run_pipe_command_with_std_output(command) + result = utils.run_command_with_output(command).splitlines() pix_fmt = result[0] try: test_vector.output_format = OutputFormat[pix_fmt.upper()]