Skip to content

Commit

Permalink
Start using dfdiskcache in CPE local cache
Browse files Browse the repository at this point in the history
Signed-off-by: Henri Rosten <[email protected]>
  • Loading branch information
henrirosten committed Dec 18, 2023
1 parent 54bd44a commit 9b762e2
Showing 1 changed file with 12 additions and 52 deletions.
64 changes: 12 additions & 52 deletions src/sbomnix/cpe.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
""" Generate CPE (Common Platform Enumeration) identifiers"""

import sys
import pathlib
import string
import datetime
import shutil
import requests

from dfdiskcache import DataFrameDiskCache
from common.utils import (
LOG,
LOG_SPAM,
Expand All @@ -22,7 +18,8 @@

###############################################################################

CACHE_DIR = "~/.cache/sbomnix"
CPE_URL = "https://github.com/tiiuae/cpedict/raw/main/data/cpes.csv"
CPE_CACHE_TTL = 60 * 60 * 24

###############################################################################

Expand All @@ -41,61 +38,24 @@ class _CPE:

def __init__(self):
LOG.debug("")
self.cpedict = pathlib.PosixPath(CACHE_DIR).expanduser() / "cpes.csv"
self.cpedict.parent.mkdir(parents=True, exist_ok=True)
self.df_cpedict = self._load_cpedict()
self.cache = DataFrameDiskCache()
self.df_cpedict = self.cache.get(CPE_URL)
if self.df_cpedict is None:
LOG.debug("CPE cache miss: re-loading CPE dictionary")
df = df_from_csv_file(CPE_URL)
self.cache.set(CPE_URL, df, ttl=CPE_CACHE_TTL)
else:
LOG.debug("read CPE dictionary from cache")
if self.df_cpedict is not None:
# Verify the loaded cpedict contains at least the following columns
required_cols = {"vendor", "product"}
if not required_cols.issubset(self.df_cpedict):
LOG.fatal(
"Missing required columns %s from cpedict, manually check: '%s'",
"Missing required columns %s from cpedict",
required_cols,
self.cpedict,
)
sys.exit(1)

def _load_cpedict(self):
LOG.debug("")
if not self.cpedict.exists() or self.cpedict.stat().st_size <= 0:
# Try updating cpe dictionary if it's not cached
if not self._update_cpedict():
LOG.warning(
"Missing '%s': CPE identifiers will be inaccurate", self.cpedict
)
return None
cpe_updated = datetime.datetime.fromtimestamp(self.cpedict.lstat().st_mtime)
week_ago = datetime.datetime.now() - datetime.timedelta(days=7)
if cpe_updated < week_ago:
# Try updating cpe dictionary if it wasn't recently updated
LOG.debug("Attempting periodic update of cpe dictionary")
if not self._update_cpedict():
LOG.warning(
"CPE data is not up-to-date: CPE identifiers will be inaccurate"
)
return df_from_csv_file(self.cpedict, exit_on_error=False)

def _update_cpedict(self):
"""Updates local cpe dictionary"""
LOG.debug("")
cpedict_bak = None
if self.cpedict.exists() and self.cpedict.stat().st_size > 0:
# Backup the original cpedict to be able to rollback in case the update
# fails
cpedict_bak = pathlib.PosixPath(CACHE_DIR).expanduser() / "cpes.csv.bak"
shutil.copy(self.cpedict, cpedict_bak)
with open(self.cpedict.as_posix(), "wb") as f:
url = "https://github.com/tiiuae/cpedict/raw/main/data/cpes.csv"
try:
f.write(requests.get(url, stream=True, timeout=10).content)
return True
except requests.exceptions.RequestException as e:
LOG.warning("CPE data update failed: %s", e)
if cpedict_bak:
LOG.debug("Rollback earlier cpedict after failed update")
shutil.copy(cpedict_bak, self.cpedict)
return False

def _cpedict_vendor(self, product):
if not product or len(product) == 1:
LOG.debug("invalid product name '%s'", product)
Expand Down

0 comments on commit 9b762e2

Please sign in to comment.