Skip to content

Commit

Permalink
Move related queries functionality into cpe_search
Browse files Browse the repository at this point in the history
  • Loading branch information
ra1nb0rn committed Mar 16, 2024
1 parent 4d25302 commit 36396da
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 317 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/run_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,3 @@ jobs:

- name: Test Exploit Completeness
run: python3 ./tests/test_exploit_completeness.py

- name: Test Related Queries Functionality
run: python3 ./tests/test_related_queries.py
2 changes: 1 addition & 1 deletion cpe_search
168 changes: 26 additions & 142 deletions search_vulns.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,20 @@
from cpe_version import CPEVersion
from cpe_search.database_wrapper_functions import *
from cpe_search.cpe_search import (
is_cpe_equal,
search_cpes,
match_cpe23_to_cpe23_from_dict,
create_cpes_from_base_cpe_and_query,
create_base_cpe_if_versionless_query,
get_possible_versions_in_query
MATCH_CPE_23_RE
)
from cpe_search.cpe_search import _load_config as _load_config_cpe_search

DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'config.json')
MATCH_CPE_23_RE = re.compile(r'cpe:2\.3:[aoh](:[^:]+){2,10}')
CPE_SEARCH_THRESHOLD = 0.72
CPE_SEARCH_THRESHOLD_MATCH = 0.72
EQUIVALENT_CPES = {}
LOAD_EQUIVALENT_CPES_MUTEX = threading.Lock()
DEDUP_LINEBREAKS_RE_1 = re.compile(r'(\r\n)+')
DEDUP_LINEBREAKS_RE_2 = re.compile(r'\n+')
CPE_COMPARISON_STOP_CHARS_RE = re.compile(r'[\+\-\_\~]')
CPE_SEARCH_COUNT = 5

# define ANSI color escape sequences
# Taken from: http://www.lihaoyi.com/post/BuildyourownCommandLinewithANSIescapecodes.html
Expand Down Expand Up @@ -405,7 +402,7 @@ def get_equivalent_cpes(cpe, config):
return equiv_cpes


def search_vulns(query, db_cursor=None, software_match_threshold=CPE_SEARCH_THRESHOLD, add_other_exploit_refs=False, is_good_cpe=False, ignore_general_cpe_vulns=False, include_single_version_vulns=False, config=None):
def search_vulns(query, db_cursor=None, software_match_threshold=CPE_SEARCH_THRESHOLD_MATCH, add_other_exploit_refs=False, is_good_cpe=False, ignore_general_cpe_vulns=False, include_single_version_vulns=False, config=None):
"""Search for known vulnerabilities based on the given query"""

# create DB handle if not given
Expand All @@ -417,24 +414,23 @@ def search_vulns(query, db_cursor=None, software_match_threshold=CPE_SEARCH_THRE
db_cursor = db_conn.cursor()
close_cursor_after = True

# if given query is not already a CPE, retrieve a CPE that matches the query
# if given query is not already a CPE, try to retrieve a CPE that matches
# the query or create alternative CPEs that could match the query
query = query.strip()
cpe = query
cpe, pot_cpes = query, []
if not MATCH_CPE_23_RE.match(query):
cpe = search_cpes(query, count=1, threshold=software_match_threshold, config=config['cpe_search'])
cpe_search_results = search_cpes(query, count=CPE_SEARCH_COUNT, threshold=software_match_threshold, config=config['cpe_search'])

if not cpe or not cpe[query]:
return None
else:
check_str = cpe[query][0][0][8:]
if any(char.isdigit() for char in query) and not any(char.isdigit() for char in check_str):
return None
if not cpe_search_results['cpes']:
return {query: {'cpe': None, 'vulns': {}, 'pot_cpes': cpe_search_results['pot_cpes']}}

cpes = cpe_search_results['cpes']
pot_cpes = cpe_search_results['pot_cpes']

if not cpes:
return {query: {'cpe': None, 'vulns': {}, 'pot_cpes': pot_cpes}}

cpe = cpe[query][0][0]
elif not is_good_cpe:
pot_matching_cpe = match_cpe23_to_cpe23_from_dict(cpe, config=config['cpe_search'])
if pot_matching_cpe:
cpe = pot_matching_cpe
cpe = cpes[0][0]

# use the retrieved CPE to search for known vulnerabilities
vulns = {}
Expand All @@ -452,120 +448,6 @@ def search_vulns(query, db_cursor=None, software_match_threshold=CPE_SEARCH_THRE
if close_cursor_after:
db_cursor.close()

return vulns


def search_vulns_return_cpe(query, db_cursor=None, software_match_threshold=CPE_SEARCH_THRESHOLD, add_other_exploits_refs=False, is_good_cpe=False, ignore_general_cpe_vulns=False, include_single_version_vulns=False, config=None):
"""Search for known vulnerabilities based on the given query and return them with their CPE"""

if not config:
config = _load_config()

query = query.strip()
cpe, pot_cpes = query, []
if not MATCH_CPE_23_RE.match(query):
is_good_cpe = False
cpes = search_cpes(query, count=5, threshold=0.25, config=config['cpe_search'])

if not cpes or not cpes[query]:
return {query: {'cpe': None, 'vulns': None, 'pot_cpes': []}}

# always create related queries with supplied version number
versions_in_query = get_possible_versions_in_query(query)
for cpe, sim in cpes[query]:
new_cpes = create_cpes_from_base_cpe_and_query(cpe, query)
for new_cpe in new_cpes:
# do not overwrite sim score of an existing CPE
if any(is_cpe_equal(new_cpe, existing_cpe[0]) for existing_cpe in cpes[query]):
continue
# only add CPE if it was not seen before
if new_cpe and not any(is_cpe_equal(new_cpe, other[0]) for other in pot_cpes):
pot_cpes.append((new_cpe, -1))

if not any(is_cpe_equal(cpe, other[0]) for other in pot_cpes):
pot_cpes.append((cpe, sim))

# always create related queries without version number if query is versionless
versionless_cpe_inserts, new_idx = [], 0
for cpe, _ in pot_cpes:
base_cpe = create_base_cpe_if_versionless_query(cpe, query)
if base_cpe:
if ((not any(is_cpe_equal(base_cpe, other[0]) for other in pot_cpes)) and
not any(is_cpe_equal(base_cpe, other[0][0]) for other in versionless_cpe_inserts)):
versionless_cpe_inserts.append(((base_cpe, -1), new_idx))
new_idx += 1
new_idx += 1

for new_cpe, idx in versionless_cpe_inserts:
pot_cpes.insert(idx, new_cpe)

if cpes[query][0][1] < software_match_threshold:
return {query: {'cpe': None, 'vulns': None, 'pot_cpes': pot_cpes}}

# catch bad CPE matches
bad_match = False
check_str = cpes[query][0][0][8:]

# ensure that the retrieved CPE has a number if query has a number
if any(char.isdigit() for char in query) and not any(char.isdigit() for char in check_str):
bad_match = True

# if a version number is clearly detectable in query, ensure this version is somewhat reflected in the CPE
if not bad_match:
cpe_has_matching_version = False
for possible_version in versions_in_query:
# ensure version has at least two parts to avoid using a short version for checking
if '.' not in possible_version:
continue

idx_pos_ver, idx_check_str = 0, 0
while idx_pos_ver < len(possible_version) and idx_check_str < len(check_str):
while idx_pos_ver < len(possible_version) and not possible_version[idx_pos_ver].isdigit():
idx_pos_ver += 1
if idx_pos_ver < len(possible_version) and possible_version[idx_pos_ver] == check_str[idx_check_str]:
idx_pos_ver += 1
idx_check_str += 1

if idx_pos_ver == len(possible_version):
cpe_has_matching_version = True
break
if not cpe_has_matching_version:
bad_match = True

if bad_match:
if cpes[query][0][1] > software_match_threshold:
return {query: {'cpe': None, 'vulns': None, 'pot_cpes': pot_cpes}}
return {query: {'cpe': None, 'vulns': None, 'pot_cpes': cpes[query]}}

# also catch bad match if query is versionless, but retrieved CPE is not
cpe_version = cpes[query][0][0].split(':')[5] if cpes[query][0][0].count(':') > 5 else ""
if cpe_version not in ('*', '-'):
base_cpe = create_base_cpe_if_versionless_query(cpes[query][0][0], query)
if base_cpe:
# remove CPEs from related queries that have a version
pot_cpes_versionless = []
for i, (pot_cpe, score) in enumerate(pot_cpes):
cpe_version_iter = pot_cpe.split(':')[5] if pot_cpe.count(':') > 5 else ""
if cpe_version_iter in ('', '*', '-'):
pot_cpes_versionless.append((pot_cpe, score))

return {query: {'cpe': None, 'vulns': None, 'pot_cpes': pot_cpes_versionless}}

cpe = cpes[query][0][0]

# use the retrieved CPE to search for known vulnerabilities
vulns = {}
if is_good_cpe:
equivalent_cpes = [cpe] # only use provided CPE
else:
equivalent_cpes = get_equivalent_cpes(cpe, config) # also search and use equivalent CPEs

for cur_cpe in equivalent_cpes:
cur_vulns = search_vulns(cur_cpe, db_cursor, software_match_threshold, add_other_exploits_refs, True, ignore_general_cpe_vulns, include_single_version_vulns, config)
for cve_id, vuln in cur_vulns.items():
if cve_id not in vulns:
vulns[cve_id] = vuln

return {query: {'cpe': '/'.join(equivalent_cpes), 'vulns': vulns, 'pot_cpes': pot_cpes}}


Expand All @@ -579,7 +461,7 @@ def parse_args():
parser.add_argument("-f", "--format", type=str, default="txt", choices={"txt", "json"}, help="Output format, either 'txt' or 'json' (default: 'txt')")
parser.add_argument("-o", "--output", type=str, help="File to write found vulnerabilities to")
parser.add_argument("-q", "--query", dest="queries", metavar="QUERY", action="append", help="A query, either software title like 'Apache 2.4.39' or a CPE 2.3 string")
parser.add_argument("--cpe-search-threshold", type=float, default=CPE_SEARCH_THRESHOLD, help="Similarity threshold used for retrieving a CPE via the cpe_search tool")
parser.add_argument("--cpe-search-threshold", type=float, default=CPE_SEARCH_THRESHOLD_MATCH, help="Similarity threshold used for retrieving a CPE via the cpe_search tool")
parser.add_argument("--ignore-general-cpe-vulns", action="store_true", help="Ignore vulnerabilities that only affect a general CPE (i.e. without version)")
parser.add_argument("--include-single-version-vulns", action="store_true", help="Include vulnerabilities that only affect one specific version of a product when querying a lower version")
parser.add_argument("-c", "--config", type=str, default=DEFAULT_CONFIG_FILE, help="A config file to use (default: config.json)")
Expand Down Expand Up @@ -627,13 +509,17 @@ def main():
query = query.strip()
cpe = query
if not MATCH_CPE_23_RE.match(query):
cpe = search_cpes(query, count=1, threshold=args.cpe_search_threshold, config=config['cpe_search'])
cpe_search_results = search_cpes(query, count=1, threshold=args.cpe_search_threshold, config=config['cpe_search'])
if cpe_search_results.get('cpes', []):
cpe = cpe_search_results['cpes'][0][0]
else:
cpe = None

found_cpe = True
if not cpe or not cpe[query]:
if not cpe:
found_cpe = False
else:
check_str = cpe[query][0][0][8:]
check_str = cpe[8:]
if any(char.isdigit() for char in query) and not any(char.isdigit() for char in check_str):
found_cpe = False

Expand All @@ -648,8 +534,6 @@ def main():
else:
vulns[query] = 'Warning: Could not find matching software for query \'%s\'' % query
continue

cpe = cpe[query][0][0]
else:
matching_cpe = match_cpe23_to_cpe23_from_dict(cpe, config=config['cpe_search'])
if matching_cpe:
Expand All @@ -666,7 +550,7 @@ def main():

for cur_cpe in equivalent_cpes:
cur_vulns = search_vulns(cur_cpe, db_cursor, args.cpe_search_threshold, False, True, args.ignore_general_cpe_vulns, args.include_single_version_vulns, config)
for cve_id, vuln in cur_vulns.items():
for cve_id, vuln in cur_vulns[cur_cpe]['vulns'].items():
if cve_id not in vulns[query]:
vulns[query][cve_id] = vuln

Expand Down
9 changes: 6 additions & 3 deletions tests/run_all_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ EXIT_2=$?
echo '[+] Running test_exploit_completeness.py'
python3 "${SCRIPT_DIR}/test_exploit_completeness.py"
EXIT_3=$?
echo '[+] Running test_related_queries.py'
python3 "${SCRIPT_DIR}/test_related_queries.py"
echo '[+] Running cpe_search/test_cpes.py'
python3 "${SCRIPT_DIR}/../cpe_search/test_cpes.py"
EXIT_4=$?
echo '[+] Running cpe_search/test_cpe_suggestions.py'
python3 "${SCRIPT_DIR}/../cpe_search/test_cpe_suggestions.py"
EXIT_5=$?

# https://stackoverflow.com/a/16358989
! (( $EXIT_1 || $EXIT_2 || $EXIT_3 || $EXIT_4 ))
! (( $EXIT_1 || $EXIT_2 || $EXIT_3 || $EXIT_4 || $EXIT_5 ))
Loading

0 comments on commit 36396da

Please sign in to comment.