From 77899069f4217b9d441b605d451b97d4c44102a4 Mon Sep 17 00:00:00 2001 From: Dustin Born Date: Tue, 23 Apr 2024 20:38:58 +0200 Subject: [PATCH] Refactoring and add function to add new CPEs to DB --- cpe_search.py | 176 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 132 insertions(+), 44 deletions(-) diff --git a/cpe_search.py b/cpe_search.py index 2749711..75e3afd 100755 --- a/cpe_search.py +++ b/cpe_search.py @@ -155,49 +155,130 @@ def intermediate_process(api_data, requestno): return cpes, deprecations -def perform_calculations(cpes, requestno): - '''Performs calculations for searching for CPEs on every CPE in a given CPE list''' +def compute_cpe_entry_tf_norm(cpe, name=''): + '''Compute term frequencies and normalization factor for given CPE and name''' + cpe_mod = cpe.replace("_", ":").replace("*", "").replace("\\", "") + cpe_name = name.lower() + + cpe_elems = [cpe_part for cpe_part in cpe_mod[10:].split(':') if cpe_part != ""] + cpe_name_elems = [word for word in cpe_name.split()] + + # compute term weights with exponential decay according to word position + words_cpe = TEXT_TO_VECTOR_RE.findall(' '.join(cpe_elems)) + words_cpe_name = TEXT_TO_VECTOR_RE.findall(' '.join(cpe_name_elems)) + word_weights_cpe = {} + for i, word in enumerate(words_cpe): + if word not in word_weights_cpe: # always use greatest weight + word_weights_cpe[word] = math.exp(CPE_TERM_WEIGHT_EXP_FACTOR * i) + + word_weights_cpe_name = {} + for i, word in enumerate(words_cpe_name): + if word not in word_weights_cpe_name: # always use greatest weight + word_weights_cpe_name[word] = math.exp(CPE_TERM_WEIGHT_EXP_FACTOR * i) + + # compute CPE entry's cosine vector for similarity comparison + cpe_tf = Counter(words_cpe + words_cpe_name) + for term, tf in cpe_tf.items(): + cpe_tf[term] = tf / len(cpe_tf) + if term in word_weights_cpe and term in word_weights_cpe_name: + # average both obtained weights from CPE itself and its name + cpe_tf[term] *= 0.5 * word_weights_cpe[term] + 0.5 * word_weights_cpe_name[term] + elif term in word_weights_cpe: + cpe_tf[term] *= word_weights_cpe[term] + elif term in word_weights_cpe_name: + cpe_tf[term] *= word_weights_cpe_name[term] + + cpe_abs = math.sqrt(sum([cnt**2 for cnt in cpe_tf.values()])) + return cpe_tf, cpe_abs + + +def add_cpes_to_db(cpe_infos, config, check_duplicates=True): + ''' + Add new cpe_infos to DB (list of cpes or tuples of (cpe, name)). + Assumes an existing CPE database. + ''' + + # compute TF values + cpe_infos_tf_norm = [] + for cpe_info in cpe_infos: + cpe, cpe_name = '', '' + if isinstance(cpe_info, list) or isinstance(cpe_info, tuple): + cpe, cpe_name = cpe_info + else: + cpe = cpe_info + cpe_tf, cpe_abs = compute_cpe_entry_tf_norm(cpe, cpe_name) + cpe_infos_tf_norm.append((cpe, cpe_tf, cpe_abs)) - if DEBUG: - print(f"[+] Performing calculations on request number {requestno}") - - cpe_info = [] - for cpe in cpes: - # prepare CPE and its name for computation of cosine similarity values - cpe_mod = cpe.split(';')[0].replace("_", ":").replace("*", "").replace("\\", "") - cpe_name = cpe.split(';')[1].lower() - cpe_elems = [cpe_part for cpe_part in cpe_mod[10:].split(':') if cpe_part != ""] - cpe_name_elems = [word for word in cpe_name.split()] - - # compute term weights with exponential decay according to word position - words_cpe = TEXT_TO_VECTOR_RE.findall(' '.join(cpe_elems)) - words_cpe_name = TEXT_TO_VECTOR_RE.findall(' '.join(cpe_name_elems)) - word_weights_cpe = {} - for i, word in enumerate(words_cpe): - if word not in word_weights_cpe: # always use greatest weight - word_weights_cpe[word] = math.exp(CPE_TERM_WEIGHT_EXP_FACTOR * i) - - word_weights_cpe_name = {} - for i, word in enumerate(words_cpe_name): - if word not in word_weights_cpe_name: # always use greatest weight - word_weights_cpe_name[word] = math.exp(CPE_TERM_WEIGHT_EXP_FACTOR * i) - - # compute CPE entry's cosine vector for similarity comparison - cpe_tf = Counter(words_cpe + words_cpe_name) - for term, tf in cpe_tf.items(): - cpe_tf[term] = tf / len(cpe_tf) - if term in word_weights_cpe and term in word_weights_cpe_name: - # average both obtained weights from CPE itself and its name - cpe_tf[term] *= 0.5 * word_weights_cpe[term] + 0.5 * word_weights_cpe_name[term] - elif term in word_weights_cpe: - cpe_tf[term] *= word_weights_cpe[term] - elif term in word_weights_cpe_name: - cpe_tf[term] *= word_weights_cpe_name[term] - - cpe_abs = math.sqrt(sum([cnt**2 for cnt in cpe_tf.values()])) - cpe_info.append((cpe.split(';')[0].lower(), cpe_tf, cpe_abs)) - - return cpe_info + # get current max CPE entry ID + db_conn = get_database_connection(config['DATABASE'], config['DATABASE_NAME']) + db_cursor = db_conn.cursor() + db_cursor.execute('SELECT MAX(entry_id) FROM cpe_entries;') + cur_max_eid = db_cursor.fetchone()[0] + if not cur_max_eid: + cur_max_eid = 0 + else: + cur_max_eid += 1 + + # add CPE infos to DB + terms_to_entries = {} + eid = cur_max_eid + for cpe_info in cpe_infos_tf_norm: + # insert unconditionally if fresh build, otherwise ensure entry doesn't exist yet + do_insert = cur_max_eid == 0 + if check_duplicates and not do_insert: + db_cursor.execute('SELECT * FROM cpe_entries where cpe = ?', (cpe_info[0], )) + do_insert = not bool(db_cursor.fetchone()) + + if not check_duplicates or do_insert: + db_cursor.execute('INSERT INTO cpe_entries VALUES (?, ?, ?, ?)', + (eid, cpe_info[0], json.dumps(cpe_info[1]), cpe_info[2])) + for term in cpe_info[1]: + if term not in terms_to_entries: + terms_to_entries[term] = [] + terms_to_entries[term].append(eid) + eid += 1 + + db_conn.commit() + db_cursor.close() + db_cursor = db_conn.cursor() + + # add term --> entries translations to DB + for term, entry_ids in terms_to_entries.items(): + if not entry_ids: + continue + + # create new entry_ids_str + i = 1 + entry_ids_str = str(entry_ids[0]) + while i < len(entry_ids): + start_i = i + while (i < len(entry_ids) - 1) and entry_ids[i] + 1 == entry_ids[i+1]: + i += 1 + if start_i == i: + entry_ids_str += ',%d' % entry_ids[i] + else: + entry_ids_str += ',%d-%d' % (entry_ids[start_i], entry_ids[i]) + i += 1 + + # check if term already exists in DB and if so add previous entry IDs + db_cursor.execute('SELECT entry_ids FROM terms_to_entries where term = ?', (term, )) + prev_entry_ids = db_cursor.fetchone() + do_insert = True + if prev_entry_ids: + prev_entry_ids = prev_entry_ids[0] + if prev_entry_ids: + entry_ids_str = prev_entry_ids + ',' + entry_ids_str + do_insert = False + + if entry_ids_str: + if do_insert: + db_cursor.execute('INSERT INTO terms_to_entries VALUES (?, ?)', (term, entry_ids_str)) + else: + db_cursor.execute('UPDATE terms_to_entries SET entry_ids = ? WHERE term = ?', (entry_ids_str, term)) + + db_conn.commit() + db_cursor.close() + db_conn.close() async def worker(headers, params, requestno, rate_limit): @@ -214,8 +295,15 @@ async def worker(headers, params, requestno, rate_limit): if api_data_response is not None: try: (cpes, deprecations) = intermediate_process(api_data=api_data_response, requestno=requestno) - cpe_infos = perform_calculations(cpes=cpes, requestno=requestno), deprecations - return cpe_infos + if DEBUG: + print(f"[+] Performing calculations on request number {requestno}") + + cpe_infos = [] + for cpe in cpes: + cpe_info = cpe.split(';') + cpe_tf, cpe_abs = compute_cpe_entry_tf_norm(cpe_info[0], cpe_info[1]) + cpe_infos.append((cpe_info[0], cpe_tf, cpe_abs)) + return cpe_infos, deprecations except Exception as e: if UPDATE_SUCCESS and not SILENT: print('Got the following exception when downloading CPE data via API: %s' % str(e))