Skip to content

Commit

Permalink
fixup! fixup! COM-6142 Changes to be committed: modified: fluster/cod…
Browse files Browse the repository at this point in the history
…ec.py modified: fluster/test_suite.py modified: scripts/gen_av1_argon.py deleted: test_suites/av1/AV1_ARGON_VECTORS_MANUAL.json
  • Loading branch information
mdimopoulos committed Oct 17, 2024
1 parent d858ec5 commit e2dbbe4
Show file tree
Hide file tree
Showing 3 changed files with 1,300 additions and 1,272 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ max-bool-expr=5
max-branches=12

# Maximum number of locals for function / method body.
max-locals=15
max-locals=18

# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down
102 changes: 65 additions & 37 deletions scripts/gen_av1_argon.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import re
import subprocess
import sys
import urllib.request
import urllib.error
import zipfile

# pylint: disable=wrong-import-position
Expand All @@ -35,16 +35,18 @@

ARGON_URL = "https://storage.googleapis.com/downloads.aomedia.org/assets/zip/"


class AV1ArgonGenerator:
"""Generates a test suite from the conformance bitstreams"""

def __init__(
self,
name: str,
suite_name: str,
codec: Codec,
description: str,
site: str,
use_ffprobe: bool = False
use_ffprobe: bool = False,
):
self.name = name
self.suite_name = suite_name
Expand All @@ -56,7 +58,7 @@ def __init__(
def generate(self, download):
"""Generates the test suite and saves it to a file"""
output_filepath = os.path.join(self.suite_name + ".json")
extract_folder="resources"
extract_folder = "resources"
test_suite = TestSuite(
output_filepath,
extract_folder,
Expand All @@ -71,43 +73,54 @@ def generate(self, download):
if download:
print(f"Download test suite archive from {source_url}")
try:
exception_str = ""
utils.download(source_url, extract_folder)
except urllib.error.URLError as ex:
exception_str = str(ex)
print(f"\tUnable to download {source_url} to {extract_folder}, {exception_str}")
print(
f"\tUnable to download {source_url} to {extract_folder}, {exception_str}"
)
except Exception as ex:
raise Exception(str(ex)) from ex

# Unzip the file
test_vector_files = []
with zipfile.ZipFile(extract_folder + '/' + self.name, 'r') as zip_ref:
with zipfile.ZipFile(extract_folder + "/" + self.name, "r") as zip_ref:
print(f"Unzip files from {self.name}")
for file_info in zip_ref.infolist():

for file_info in zip_ref.namelist():
# Extract test vector files
if file_info.filename.endswith('.obu'):
if file_info.endswith(".obu"):
zip_ref.extract(file_info, extract_folder)
test_vector_files.append(file_info.filename)
filename = os.path.splitext(os.path.basename(file_info.filename))[0]

test_vector_files.append(file_info)

# Extract md5 files
if file_info.filename.endswith('.md5') and 'md5_ref/' in file_info.filename and 'layers/' not in file_info.filename:
if (
file_info.endswith(".md5")
and "md5_ref/" in file_info
and "layers/" not in file_info
):
zip_ref.extract(file_info, extract_folder)

# Create the test vector and test suite
print ("Creating test vectors and test suite")
source_checksum = utils.file_checksum(extract_folder + '/' + self.name)
print("Creating test vectors and test suite")
source_checksum = utils.file_checksum(extract_folder + "/" + self.name)
for file in test_vector_files:
filename = os.path.splitext(os.path.basename(file))[0]
# ffprobe execution
if self.use_ffprobe:
full_path = os.path.abspath(extract_folder + '/' + file)
ffprobe = utils.normalize_binary_cmd('ffprobe')
command = [ffprobe, '-v', 'error', '-select_streams', 'v:0',
'-show_entries', 'stream=pix_fmt', '-of',
'default=nokey=1:noprint_wrappers=1',
full_path]
full_path = os.path.abspath(extract_folder + "/" + file)
ffprobe = utils.normalize_binary_cmd("ffprobe")
command = [
ffprobe,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"stream=pix_fmt",
"-of",
"default=nokey=1:noprint_wrappers=1",
full_path,
]
try:
result = utils.run_command_with_output(command).splitlines()
pix_fmt = result[0]
Expand All @@ -117,32 +130,45 @@ def generate(self, download):
pix_fmt = "None"

# Processing md5 files
md5_file_to_find = os.path.splitext(filename)[0] + '.md5'
split = full_path.split('/')
md5_directory = '/'.join(split[:9]) + '/' + 'md5_ref'
md5_path_to_file = os.path.join(md5_directory, md5_file_to_find)

md5_file_to_find = os.path.splitext(filename)[0] + ".md5"
full_path_split = full_path.split("/")
md5_directory_path = (
"/".join(full_path_split[: len(full_path_split) - 2])
+ "/"
+ "md5_ref"
)
md5_file_path = os.path.join(md5_directory_path, md5_file_to_find)

# Check the .md5 file and get checksum
if os.path.exists(md5_path_to_file):
if os.path.exists(md5_file_path):
try:
result_checksum = self._fill_checksum_argon(self, md5_path_to_file)
result_checksum = self._fill_checksum_argon(md5_file_path)
except Exception as ex:
raise Exception("MD5 does not match")
print("MD5 does not match")
raise ex
else:
try:
result_checksum = utils.file_checksum(full_path)
except Exception as ex:
raise Exception("MD5 cannot be calculated")
print("MD5 cannot be calculated")
raise ex

# Add data to the test vector and the test suite
test_vector = TestVector(filename, source_url, source_checksum, file, OutputFormat[pix_fmt.upper()], result_checksum)
test_suite.test_vectors[filename] = test_vector
test_vector = TestVector(
filename,
source_url,
source_checksum,
file,
OutputFormat[pix_fmt.upper()],
result_checksum,
)
test_suite.test_vectors[filename] = test_vector

test_suite.to_json_file(output_filepath)
print("Generate new test suite: " + test_suite.name + ".json")

@staticmethod
def _fill_checksum_argon(self, dest_dir):
def _fill_checksum_argon(dest_dir):
checksum_file = dest_dir
if checksum_file is None:
raise Exception("MD5 not found")
Expand All @@ -154,10 +180,13 @@ def _fill_checksum_argon(self, dest_dir):
else:
result = -1
# Assert that we have extracted a valid MD5 from the file
assert len(result) == 32 and re.search(
r"^[a-fA-F0-9]{32}$", result) is not None, f"{result} is not a valid MD5 hash"
assert (
len(result) == 32
and re.search(r"^[a-fA-F0-9]{32}$", result) is not None
), f"{result} is not a valid MD5 hash"
return result


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument(
Expand All @@ -173,7 +202,6 @@ def _fill_checksum_argon(self, dest_dir):
Codec.AV1,
"AV1 Argon Streams",
ARGON_URL,
True
True,
)
generator.generate(not args.skip_download)

Loading

0 comments on commit e2dbbe4

Please sign in to comment.