-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add Bitnami as new provider #512
Open
juan131
wants to merge
5
commits into
anchore:main
Choose a base branch
from
juan131:feat/bitnami-provider
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
from __future__ import annotations | ||
|
||
from dataclasses import dataclass, field | ||
from typing import TYPE_CHECKING | ||
|
||
from vunnel import provider, result, schema | ||
|
||
from .parser import Parser | ||
|
||
if TYPE_CHECKING: | ||
import datetime | ||
|
||
|
||
@dataclass | ||
class Config: | ||
runtime: provider.RuntimeConfig = field( | ||
default_factory=lambda: provider.RuntimeConfig( | ||
result_store=result.StoreStrategy.SQLITE, | ||
existing_results=provider.ResultStatePolicy.DELETE_BEFORE_WRITE, | ||
), | ||
) | ||
request_timeout: int = 125 | ||
|
||
|
||
class Provider(provider.Provider): | ||
|
||
__schema__ = schema.OSVSchema() | ||
__distribution_version__ = int(__schema__.major_version) | ||
|
||
def __init__(self, root: str, config: Config | None = None): | ||
if not config: | ||
config = Config() | ||
|
||
super().__init__(root, runtime_cfg=config.runtime) | ||
self.config = config | ||
self.logger.debug(f"config: {config}") | ||
|
||
self.schema = self.__schema__ | ||
self.parser = Parser( | ||
ws=self.workspace, | ||
logger=self.logger, | ||
) | ||
|
||
# this provider requires the previous state from former runs | ||
provider.disallow_existing_input_policy(config.runtime) | ||
|
||
@classmethod | ||
def name(cls) -> str: | ||
return "bitnami" | ||
|
||
def update(self, last_updated: datetime.datetime | None) -> tuple[list[str], int]: | ||
|
||
# TODO: use of last_updated as NVD provider does to avoid downloading all | ||
# vulnerability data from the source and make incremental updates instead | ||
with self.results_writer() as writer: | ||
for vuln_id, record in self.parser.get(): | ||
writer.write( | ||
identifier=vuln_id.lower(), | ||
schema=self.schema, | ||
payload=record, | ||
) | ||
|
||
return self.parser.urls, len(writer) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import os | ||
import shlex | ||
import shutil | ||
import subprocess | ||
import tempfile | ||
from dataclasses import dataclass | ||
|
||
from vunnel import utils | ||
|
||
|
||
@dataclass | ||
class GitRevision: | ||
sha: str | ||
file: str | ||
|
||
|
||
class GitWrapper: | ||
_check_cmd_ = "git --version" | ||
_is_git_repo_cmd_ = "git rev-parse --is-inside-work-tree" | ||
_clone_cmd_ = "git clone -b {branch} {src} {dest}" | ||
_check_out_cmd_ = "git checkout {branch}" | ||
|
||
def __init__( | ||
self, | ||
source: str, | ||
branch: str, | ||
checkout_dest: str, | ||
logger: logging.Logger | None = None, | ||
): | ||
self.src = source | ||
self.branch = branch | ||
self.dest = checkout_dest | ||
self.workspace = tempfile.gettempdir() | ||
|
||
if not logger: | ||
logger = logging.getLogger(self.__class__.__name__) | ||
self.logger = logger | ||
|
||
try: | ||
out = self._exec_cmd(self._check_cmd_) | ||
self.logger.trace(f"git executable verified using cmd: {self._check_cmd_}, output: {out.decode()}") | ||
except: | ||
self.logger.exception('could not find required "git" executable. Please install git on host') | ||
raise | ||
|
||
def _check(self, destination): | ||
try: | ||
if not os.path.exists(destination): | ||
self.logger.debug(f"git working tree not found at {destination}") | ||
return False | ||
|
||
cmd = self._is_git_repo_cmd_ | ||
out = self._exec_cmd(cmd, cwd=destination) | ||
self.logger.debug(f"check for git repository, cmd: {cmd}, output: {out.decode()}") | ||
except Exception: | ||
self.logger.debug(f"git working tree not found at {destination}", exc_info=True) | ||
return False | ||
|
||
return True | ||
|
||
def delete_repo(self): | ||
if os.path.exists(self.dest): | ||
self.logger.debug("deleting existing repository") | ||
shutil.rmtree(self.dest, ignore_errors=True) | ||
|
||
@utils.retry_with_backoff() | ||
def clone_repo(self): | ||
try: | ||
self.logger.info(f"cloning git repository {self.src} branch {self.branch} to {self.dest}") | ||
cmd = self._clone_cmd_.format(src=self.src, dest=self.dest, branch=self.branch) | ||
out = self._exec_cmd(cmd) | ||
self.logger.debug(f"initialized git repo, cmd: {cmd}, output: {out.decode()}") | ||
except: | ||
self.logger.exception(f"failed to clone git repository {self.src} branch {self.branch} to {self.dest}") | ||
raise | ||
|
||
def _exec_cmd(self, cmd, *args, **kwargs) -> bytes: | ||
""" | ||
Run a command with errors etc handled | ||
:param cmd: list of arguments (including command name, e.g. ['ls', '-l]) | ||
:param args: | ||
:param kwargs: | ||
:return: | ||
""" | ||
try: | ||
self.logger.trace(f"running: {cmd}") | ||
cmd_list = shlex.split(cmd) | ||
# S603 disable explanation: running git commands by design | ||
return subprocess.check_output(cmd_list, *args, **kwargs, stderr=subprocess.PIPE) # noqa: S603 | ||
except Exception as e: | ||
self.logger.exception(f"error executing command: {cmd}") | ||
raise e |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
from __future__ import annotations | ||
|
||
import logging | ||
import os | ||
from typing import TYPE_CHECKING | ||
|
||
import orjson | ||
|
||
if TYPE_CHECKING: | ||
from vunnel.workspace import Workspace | ||
|
||
from .git import GitWrapper | ||
|
||
namespace = "bitnami" | ||
|
||
|
||
class Parser: | ||
_git_src_url_ = "https://github.com/bitnami/vulndb.git" | ||
_git_src_branch_ = "main" | ||
|
||
def __init__(self, ws: Workspace, logger: logging.Logger | None = None): | ||
self.workspace = ws | ||
self.git_url = self._git_src_url_ | ||
self.git_branch = self._git_src_branch_ | ||
self.urls = [self.git_url] | ||
if not logger: | ||
logger = logging.getLogger(self.__class__.__name__) | ||
self.logger = logger | ||
_checkout_dst_ = os.path.join(self.workspace.input_path, "vulndb") | ||
self.git_wrapper = GitWrapper( | ||
source=self.git_url, | ||
branch=self.git_branch, | ||
checkout_dest=_checkout_dst_, | ||
logger=self.logger, | ||
) | ||
|
||
def _load(self): | ||
self.logger.info("loading data from git repository") | ||
|
||
vuln_data_dir = os.path.join(self.workspace.input_path, "vulndb", "data") | ||
for root, dirs, files in os.walk(vuln_data_dir): | ||
dirs.sort() | ||
for file in sorted(files): | ||
full_path = os.path.join(root, file) | ||
with open(full_path, encoding="utf-8") as f: | ||
yield orjson.loads(f.read()) | ||
|
||
def _normalize(self, vuln_entry): | ||
self.logger.info("normalizing vulnerability data") | ||
|
||
vuln_id = vuln_entry["id"] | ||
if "aliases" in vuln_entry and len(vuln_entry["aliases"]) > 0: | ||
vuln_id = vuln_entry["aliases"][0] | ||
fixed_in = [] | ||
if "affected" in vuln_entry: | ||
for affected in vuln_entry["affected"]: | ||
version = "None" | ||
if "ranges" in affected: | ||
for r in affected["ranges"]: | ||
if "events" in r: | ||
for event in r["events"]: | ||
# TODO: manage last_affected | ||
# if events["last_affected"]: | ||
# version = events["last_affected"] | ||
# break | ||
if "fixed" in event: | ||
version = event["fixed"] | ||
break | ||
|
||
fixed_in.append( | ||
{ | ||
"Name": affected["package"]["name"], | ||
"VersionFormat": "semver", | ||
"NamespaceName": namespace, | ||
"Version": version, | ||
}, | ||
) | ||
link = "None" | ||
if "references" in vuln_entry and len(vuln_entry["references"]) > 0: | ||
link = vuln_entry["references"][0] | ||
|
||
return vuln_id, { | ||
"Vulnerability": { | ||
"Name": vuln_id, | ||
"NamespaceName": namespace, | ||
"Link": link, | ||
"Severity": vuln_entry["database_specific"]["severity"], | ||
"Description": vuln_entry["details"], | ||
"FixedIn": fixed_in, | ||
}, | ||
} | ||
|
||
def get(self): | ||
# Initialize the git repository | ||
self.git_wrapper.delete_repo() | ||
self.git_wrapper.clone_repo() | ||
|
||
# Load the data from the git repository | ||
for vuln_entry in self._load(): | ||
# Normalize the loaded data | ||
yield self._normalize(vuln_entry) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we want the second return value here to conform to the OSV schema.
I'm simplifying a bit, but second element of the returned tuple will be JSON-serialized and eventually picked up by the anchore/grype-db#217 (and some follow up work) and transformed into the Grype-specific vulnerability schema that Grype matches on.
This dict seems to imitate the shape of some dictionaries returned by other parsers, which looks right, but in this case I think we want a real OSV record.
I'm going to try to do some refactoring to make it more obvious what things
parser.get()
should return.