From ac953b1d51b315a00e3de970a671e52721aa4659 Mon Sep 17 00:00:00 2001 From: Weston Steimel Date: Mon, 30 Jan 2023 16:19:26 +0000 Subject: [PATCH] fix: minor sles provider fixes (#51) * fix: use the compressed xml files for the sles provider * chore: adjust type annotations * use iterparse from defusedxml * fix: actually skip fixed-in when name or version not known * chore: remove dead code --------- Signed-off-by: Weston Steimel --- src/vunnel/providers/sles/parser.py | 19 +++++--- src/vunnel/utils/oval_parser.py | 47 +++++++++++-------- src/vunnel/utils/oval_v2.py | 73 +++++++++++++++-------------- 3 files changed, 78 insertions(+), 61 deletions(-) diff --git a/src/vunnel/providers/sles/parser.py b/src/vunnel/providers/sles/parser.py index e0a48375..a0114464 100644 --- a/src/vunnel/providers/sles/parser.py +++ b/src/vunnel/providers/sles/parser.py @@ -26,6 +26,7 @@ iter_parse_vulnerability_file, ) from vunnel.utils.vulnerability import CVSS, CVSSBaseMetrics, FixedIn, Vulnerability +from vunnel.workspace import Workspace namespace = "sles" @@ -44,15 +45,17 @@ class Parser: - __oval_url__ = "https://ftp.suse.com/pub/projects/security/oval/suse.linux.enterprise.server.{}.xml" - __oval_file_name__ = "suse-linux-enterprise-server-{}.xml" + __oval_url__ = "https://ftp.suse.com/pub/projects/security/oval/suse.linux.enterprise.server.{}.xml.gz" + __oval_file_name__ = "suse-linux-enterprise-server-{}.xml.gz" __oval_dir_path__ = "oval" __source_dir_path__ = "source" # this is pretty odd, but there are classmethods that need logging logger = logging.getLogger("sles-parser") - def __init__(self, workspace, allow_versions, download_timeout=125, logger=None): + def __init__( + self, workspace: Workspace, allow_versions: list[str], download_timeout: int = 125, logger: logging.Logger | None = None + ): self.oval_dir_path = os.path.join(workspace.input_path, self.__source_dir_path__, self.__oval_dir_path__) self.allow_versions = allow_versions self.download_timeout = download_timeout @@ -65,8 +68,7 @@ def __init__(self, workspace, allow_versions, download_timeout=125, logger=None) Parser.logger = logger @utils.retry_with_backoff() - def _download(self, major_version: str, skip_if_exists: bool = False): - + def _download(self, major_version: str, skip_if_exists: bool = False) -> str: if not os.path.exists(self.oval_dir_path): self.logger.debug(f"creating workspace for OVAL source data at {self.oval_dir_path}") os.makedirs(self.oval_dir_path) @@ -100,7 +102,9 @@ def _download(self, major_version: str, skip_if_exists: bool = False): with open(oval_file_path, "wb") as fp: for chunk in r.iter_content(chunk_size=1024): - fp.write(chunk) + if chunk: + fp.write(chunk) + fp.flush() return oval_file_path @@ -289,6 +293,7 @@ def _transform_oval_vulnerabilities(cls, major_version: str, parsed_dict: dict) "package name and or version invalid, skipping fixed-in for %s", test_id, ) + continue fixes.append( FixedIn( @@ -323,7 +328,7 @@ def _transform_oval_vulnerabilities(cls, major_version: str, parsed_dict: dict) return results - def get(self, skip_if_exists=False): + def get(self, skip_if_exists: bool = False): parser_factory = OVALParserFactory( parsers=[ SLESVulnerabilityParser, diff --git a/src/vunnel/utils/oval_parser.py b/src/vunnel/utils/oval_parser.py index 31face9e..f6a287b3 100644 --- a/src/vunnel/utils/oval_parser.py +++ b/src/vunnel/utils/oval_parser.py @@ -2,6 +2,7 @@ from __future__ import annotations import copy +import gzip import logging import os import re @@ -65,26 +66,32 @@ def parse(dest_file: str, config: Config): if os.path.exists(dest_file): processing = False - for event, element in ET.iterparse(dest_file, events=("start", "end")): - # gather definition - if event == "start" and re.search(config.tag_pattern, element.tag).group(1) == "definition": - processing = True - elif event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definition": - try: - _process_definition(element, vuln_dict, config) - except: - logger.exception("Error parsing oval record. Logging error and continuing") - finally: - processing = False - - if not processing and event == "end": - # print('Clearing element: {} post event: {}'.format(re.search(tag_pattern, element.tag).group(1), event)) - element.clear() - - # bail after definitions - if event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definitions": - # print('Stopped parsing') - break + opener = open + + if dest_file.endswith(".gz"): + opener = gzip.open + + with opener(dest_file, "rb") as f: + for event, element in ET.iterparse(dest_file, events=("start", "end")): + # gather definition + if event == "start" and re.search(config.tag_pattern, element.tag).group(1) == "definition": + processing = True + elif event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definition": + try: + _process_definition(element, vuln_dict, config) + except: + logger.exception("Error parsing oval record. Logging error and continuing") + finally: + processing = False + + if not processing and event == "end": + # print('Clearing element: {} post event: {}'.format(re.search(tag_pattern, element.tag).group(1), event)) + element.clear() + + # bail after definitions + if event == "end" and re.search(config.tag_pattern, element.tag).group(1) == "definitions": + # print('Stopped parsing') + break else: logger.warning(f"{dest_file} not found, returning empty results") diff --git a/src/vunnel/utils/oval_v2.py b/src/vunnel/utils/oval_v2.py index b97ebe40..74da4212 100644 --- a/src/vunnel/utils/oval_v2.py +++ b/src/vunnel/utils/oval_v2.py @@ -8,6 +8,7 @@ from __future__ import annotations import enum +import gzip import logging import os import re @@ -16,6 +17,8 @@ from collections import defaultdict from dataclasses import dataclass +from defusedxml.ElementTree import iterparse + class OVALElementEnum(enum.Enum): """ @@ -402,7 +405,7 @@ def iter_parse_vulnerability_file( oval_file_path: str, parser_config: OVALParserConfig, parser_factory: OVALParserFactory, -) -> dict: +) -> defaultdict[enum.Enum, dict[str, Parsed]]: """ Starting point for parsing a vulnerability class OVAL file content. Iteratively parses the file using the parsers supplied by the input factory. @@ -412,43 +415,45 @@ def iter_parse_vulnerability_file( logger = logging.getLogger("oval-v2-parser") logger.info(f"parsing {oval_file_path}") - parsed_dict = defaultdict(dict) + parsed_dict: defaultdict[enum.Enum, dict[str, Parsed]] = defaultdict(dict) if os.path.exists(oval_file_path): ingress = False - for event, xml_element in ET.iterparse(oval_file_path, events=("start", "end")): - # gather definition - if event == "start" and parser_factory.get_oval_element(xml_element, parser_config): - ingress = True - elif event == "end": - # is this an interesting oval element? - oval_element = parser_factory.get_oval_element(xml_element, parser_config) - - # is the interesting oval element in ingress? - if oval_element and ingress: - # yes and yes, halt ingress and parse the element - ingress = False - parser = parser_factory.get_parser(oval_element) - if parser: - result = parser.parse(xml_element, parser_config) - if result: - parsed_dict[oval_element][result.identity] = result + opener = open + + if oval_file_path.endswith(".gz"): + opener = gzip.open + + with opener(oval_file_path, "rb") as f: + for event, xml_element in iterparse(f, events=("start", "end")): + # gather definition + if event == "start" and parser_factory.get_oval_element(xml_element, parser_config): + ingress = True + elif event == "end": + # is this an interesting oval element? + oval_element = parser_factory.get_oval_element(xml_element, parser_config) + + # is the interesting oval element in ingress? + if oval_element and ingress: + # yes and yes, halt ingress and parse the element + ingress = False + parser = parser_factory.get_parser(oval_element) + if parser: + result = parser.parse(xml_element, parser_config) + if result: + parsed_dict[oval_element][result.identity] = result + else: + logger.warning("unable to parse %s element", repr(oval_element.value)) else: - logger.warn("unable to parse %s element", repr(oval_element.value)) - else: - logger.warn( - "no parser found for oval element %s, skipping", - oval_element, - ) - # else: - # # this marks the end of an xml element but does it's not an interesting oval element or may be not the whole element yet - # pass - - # clear the element if doesn't need to be processed or done processing - if not ingress and event == "end": - xml_element.clear() - + logger.warning( + "no parser found for oval element %s, skipping", + oval_element, + ) + + # clear the element if doesn't need to be processed or done processing + if not ingress and event == "end": + xml_element.clear() else: - logger.warn("{} not found, returning empty results".format(oval_file_path)) + logger.warning(f"{oval_file_path} not found, returning empty results") return parsed_dict