diff --git a/capa/features/freeze/__init__.py b/capa/features/freeze/__init__.py index 9e3f73310..b5b0f7f92 100644 --- a/capa/features/freeze/__init__.py +++ b/capa/features/freeze/__init__.py @@ -21,6 +21,7 @@ # https://github.com/mandiant/capa/issues/1699 from typing_extensions import TypeAlias +import capa.loader import capa.helpers import capa.version import capa.features.file @@ -686,9 +687,9 @@ def main(argv=None): args = parser.parse_args(args=argv) capa.main.handle_common_args(args) - sigpaths = capa.main.get_signatures(args.signatures) + sigpaths = capa.loader.get_signatures(args.signatures) - extractor = capa.main.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False) + extractor = capa.loader.get_extractor(args.sample, args.format, args.os, args.backend, sigpaths, False) Path(args.output).write_bytes(dump(extractor)) diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 2594edb71..b3ec0183b 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -78,7 +78,7 @@ def run_headless(): meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True): logger.info("capa encountered warnings during analysis") @@ -128,7 +128,7 @@ def run_ui(): meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): logger.info("capa encountered warnings during analysis") diff --git a/capa/helpers.py b/capa/helpers.py index 89dad8b91..a85271af1 100644 --- a/capa/helpers.py +++ b/capa/helpers.py @@ -5,6 +5,7 @@ # Unless required by applicable law or agreed to in writing, software distributed under the License # is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and limitations under the License. +import sys import json import inspect import logging @@ -201,3 +202,11 @@ def log_unsupported_runtime_error(): " If you're seeing this message on the command line, please ensure you're running a supported Python version." ) logger.error("-" * 80) + + +def is_running_standalone() -> bool: + """ + are we running from a PyInstaller'd executable? + if so, then we'll be able to access `sys._MEIPASS` for the packaged resources. + """ + return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS") diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index e9249a77f..ddd4c4e0d 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -775,7 +775,7 @@ def slot_progress_feature_extraction(text): meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] - meta.analysis.layout = capa.main.compute_layout(ruleset, self.feature_extractor, capabilities) + meta.analysis.layout = capa.loader.compute_layout(ruleset, self.feature_extractor, capabilities) except UserCancelledError: logger.info("User cancelled analysis.") return False diff --git a/capa/main.py b/capa/main.py index f5dd205d7..de1101ab4 100644 --- a/capa/main.py +++ b/capa/main.py @@ -11,26 +11,23 @@ import io import os import sys -import json import time import logging import argparse -import datetime import textwrap import contextlib from types import TracebackType -from typing import Any, Set, Dict, List, Optional +from typing import Any, Dict, List, Optional from pathlib import Path -import halo import colorama from pefile import PEFormatError -from typing_extensions import assert_never from elftools.common.exceptions import ELFError import capa.perf import capa.rules import capa.engine +import capa.loader import capa.helpers import capa.version import capa.render.json @@ -51,6 +48,7 @@ import capa.features.extractors.cape.extractor from capa.rules import RuleSet from capa.engine import MatchResults +from capa.loader import BACKEND_VIV, BACKEND_CAPE, BACKEND_BINJA, BACKEND_DOTNET, BACKEND_PEFILE from capa.helpers import ( get_file_taste, get_auto_format, @@ -82,23 +80,11 @@ FORMAT_FREEZE, FORMAT_RESULT, ) -from capa.features.address import Address from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities -from capa.features.extractors.base_extractor import ( - SampleHashes, - FeatureExtractor, - StaticFeatureExtractor, - DynamicFeatureExtractor, -) +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor RULES_PATH_DEFAULT_STRING = "(embedded rules)" SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)" - -BACKEND_VIV = "vivisect" -BACKEND_DOTNET = "dotnet" -BACKEND_BINJA = "binja" -BACKEND_PEFILE = "pefile" -BACKEND_CAPE = "cape" BACKEND_DEFAULT = "(default) use default backend for given file type" E_MISSING_RULES = 10 @@ -137,73 +123,13 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def is_supported_format(sample: Path) -> bool: - """ - Return if this is a supported file based on magic header values - """ - taste = sample.open("rb").read(0x100) - - return len(list(capa.features.extractors.common.extract_format(taste))) == 1 - - -def is_supported_arch(sample: Path) -> bool: - buf = sample.read_bytes() - - return len(list(capa.features.extractors.common.extract_arch(buf))) == 1 - - -def get_arch(sample: Path) -> str: - buf = sample.read_bytes() - - for feature, _ in capa.features.extractors.common.extract_arch(buf): - assert isinstance(feature.value, str) - return feature.value - - return "unknown" - - -def is_supported_os(sample: Path) -> bool: - buf = sample.read_bytes() - - return len(list(capa.features.extractors.common.extract_os(buf))) == 1 - - -def get_os(sample: Path) -> str: - buf = sample.read_bytes() - - for feature, _ in capa.features.extractors.common.extract_os(buf): - assert isinstance(feature.value, str) - return feature.value - - return "unknown" - - -def get_meta_str(vw): - """ - Return workspace meta information string - """ - meta = [] - for k in ["Format", "Platform", "Architecture"]: - if k in vw.metadata: - meta.append(f"{k.lower()}: {vw.metadata[k]}") - return f"{', '.join(meta)}, number of functions: {len(vw.getFunctions())}" - - -def is_running_standalone() -> bool: - """ - are we running from a PyInstaller'd executable? - if so, then we'll be able to access `sys._MEIPASS` for the packaged resources. - """ - return hasattr(sys, "frozen") and hasattr(sys, "_MEIPASS") - - def get_default_root() -> Path: """ get the file system path to the default resources directory. under PyInstaller, this comes from _MEIPASS. under source, this is the root directory of the project. """ - if is_running_standalone(): + if capa.helpers.is_running_standalone(): # pylance/mypy don't like `sys._MEIPASS` because this isn't standard. # its injected by pyinstaller. # so we'll fetch this attribute dynamically. @@ -228,425 +154,6 @@ def get_default_signatures() -> List[Path]: return ret -def get_workspace(path: Path, input_format: str, sigpaths: List[Path]): - """ - load the program at the given path into a vivisect workspace using the given format. - also apply the given FLIRT signatures. - - supported formats: - - pe - - elf - - shellcode 32-bit - - shellcode 64-bit - - auto - - this creates and analyzes the workspace; however, it does *not* save the workspace. - this is the responsibility of the caller. - """ - - # lazy import enables us to not require viv if user wants SMDA, for example. - import viv_utils - import viv_utils.flirt - - logger.debug("generating vivisect workspace for: %s", path) - if input_format == FORMAT_AUTO: - if not is_supported_format(path): - raise UnsupportedFormatError() - - # don't analyze, so that we can add our Flirt function analyzer first. - vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) - elif input_format in {FORMAT_PE, FORMAT_ELF}: - vw = viv_utils.getWorkspace(str(path), analyze=False, should_save=False) - elif input_format == FORMAT_SC32: - # these are not analyzed nor saved. - vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="i386", analyze=False) - elif input_format == FORMAT_SC64: - vw = viv_utils.getShellcodeWorkspaceFromFile(str(path), arch="amd64", analyze=False) - else: - raise ValueError("unexpected format: " + input_format) - - viv_utils.flirt.register_flirt_signature_analyzers(vw, [str(s) for s in sigpaths]) - - vw.analyze() - - logger.debug("%s", get_meta_str(vw)) - return vw - - -def get_extractor( - input_path: Path, - input_format: str, - os_: str, - backend: str, - sigpaths: List[Path], - should_save_workspace=False, - disable_progress=False, - sample_path: Optional[Path] = None, -) -> FeatureExtractor: - """ - raises: - UnsupportedFormatError - UnsupportedArchError - UnsupportedOSError - """ - if backend == BACKEND_CAPE: - import capa.features.extractors.cape.extractor - - report = json.loads(input_path.read_text(encoding="utf-8")) - return capa.features.extractors.cape.extractor.CapeExtractor.from_report(report) - - elif backend == BACKEND_DOTNET: - import capa.features.extractors.dnfile.extractor - - if input_format not in (FORMAT_PE, FORMAT_DOTNET): - raise UnsupportedFormatError() - - return capa.features.extractors.dnfile.extractor.DnfileFeatureExtractor(input_path) - - elif backend == BACKEND_BINJA: - from capa.features.extractors.binaryninja.find_binja_api import find_binja_path - - # When we are running as a standalone executable, we cannot directly import binaryninja - # We need to fist find the binja API installation path and add it into sys.path - if is_running_standalone(): - bn_api = find_binja_path() - if bn_api.exists(): - sys.path.append(str(bn_api)) - - try: - import binaryninja - from binaryninja import BinaryView - except ImportError: - raise RuntimeError( - "Cannot import binaryninja module. Please install the Binary Ninja Python API first: " - + "https://docs.binary.ninja/dev/batch.html#install-the-api)." - ) - - import capa.features.extractors.binaryninja.extractor - - if input_format not in (FORMAT_SC32, FORMAT_SC64): - if not is_supported_format(input_path): - raise UnsupportedFormatError() - - if not is_supported_arch(input_path): - raise UnsupportedArchError() - - if os_ == OS_AUTO and not is_supported_os(input_path): - raise UnsupportedOSError() - - with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): - bv: BinaryView = binaryninja.load(str(input_path)) - if bv is None: - raise RuntimeError(f"Binary Ninja cannot open file {input_path}") - - return capa.features.extractors.binaryninja.extractor.BinjaFeatureExtractor(bv) - - elif backend == BACKEND_PEFILE: - import capa.features.extractors.pefile - - return capa.features.extractors.pefile.PefileFeatureExtractor(input_path) - - elif backend == BACKEND_VIV: - import capa.features.extractors.viv.extractor - - if input_format not in (FORMAT_SC32, FORMAT_SC64): - if not is_supported_format(input_path): - raise UnsupportedFormatError() - - if not is_supported_arch(input_path): - raise UnsupportedArchError() - - if os_ == OS_AUTO and not is_supported_os(input_path): - raise UnsupportedOSError() - - with halo.Halo(text="analyzing program", spinner="simpleDots", stream=sys.stderr, enabled=not disable_progress): - vw = get_workspace(input_path, input_format, sigpaths) - - if should_save_workspace: - logger.debug("saving workspace") - try: - vw.saveWorkspace() - except IOError: - # see #168 for discussion around how to handle non-writable directories - logger.info("source directory is not writable, won't save intermediate workspace") - else: - logger.debug("CAPA_SAVE_WORKSPACE unset, not saving workspace") - - return capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, input_path, os_) - - else: - raise ValueError("unexpected backend: " + backend) - - -def get_file_extractors(input_file: Path, input_format: str) -> List[FeatureExtractor]: - file_extractors: List[FeatureExtractor] = [] - - if input_format == FORMAT_PE: - file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) - - elif input_format == FORMAT_DOTNET: - file_extractors.append(capa.features.extractors.pefile.PefileFeatureExtractor(input_file)) - file_extractors.append(capa.features.extractors.dotnetfile.DotnetFileFeatureExtractor(input_file)) - - elif input_format == FORMAT_ELF: - file_extractors.append(capa.features.extractors.elffile.ElfFeatureExtractor(input_file)) - - elif input_format == FORMAT_CAPE: - report = json.loads(input_file.read_text(encoding="utf-8")) - file_extractors.append(capa.features.extractors.cape.extractor.CapeExtractor.from_report(report)) - - return file_extractors - - -def get_signatures(sigs_path: Path) -> List[Path]: - if not sigs_path.exists(): - raise IOError(f"signatures path {sigs_path} does not exist or cannot be accessed") - - paths: List[Path] = [] - if sigs_path.is_file(): - paths.append(sigs_path) - elif sigs_path.is_dir(): - logger.debug("reading signatures from directory %s", sigs_path.resolve()) - for file in sigs_path.rglob("*"): - if file.is_file() and file.suffix.lower() in (".pat", ".pat.gz", ".sig"): - paths.append(file) - - # Convert paths to their absolute and normalized forms - paths = [path.resolve().absolute() for path in paths] - - # load signatures in deterministic order: the alphabetic sorting of filename. - # this means that `0_sigs.pat` loads before `1_sigs.pat`. - paths = sorted(paths, key=lambda path: path.name) - - for path in paths: - logger.debug("found signature file: %s", path) - - return paths - - -def get_sample_analysis(format_, arch, os_, extractor, rules_path, counts): - if isinstance(extractor, StaticFeatureExtractor): - return rdoc.StaticAnalysis( - format=format_, - arch=arch, - os=os_, - extractor=extractor.__class__.__name__, - rules=tuple(rules_path), - base_address=frz.Address.from_capa(extractor.get_base_address()), - layout=rdoc.StaticLayout( - functions=(), - # this is updated after capabilities have been collected. - # will look like: - # - # "functions": { 0x401000: { "matched_basic_blocks": [ 0x401000, 0x401005, ... ] }, ... } - ), - feature_counts=counts["feature_counts"], - library_functions=counts["library_functions"], - ) - elif isinstance(extractor, DynamicFeatureExtractor): - return rdoc.DynamicAnalysis( - format=format_, - arch=arch, - os=os_, - extractor=extractor.__class__.__name__, - rules=tuple(rules_path), - layout=rdoc.DynamicLayout( - processes=(), - ), - feature_counts=counts["feature_counts"], - ) - else: - raise ValueError("invalid extractor type") - - -def collect_metadata( - argv: List[str], - input_path: Path, - input_format: str, - os_: str, - rules_path: List[Path], - extractor: FeatureExtractor, - counts: dict, -) -> rdoc.Metadata: - # if it's a binary sample we hash it, if it's a report - # we fetch the hashes from the report - sample_hashes: SampleHashes = extractor.get_sample_hashes() - md5, sha1, sha256 = sample_hashes.md5, sample_hashes.sha1, sample_hashes.sha256 - - global_feats = list(extractor.extract_global_features()) - extractor_format = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Format)] - extractor_arch = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.Arch)] - extractor_os = [f.value for (f, _) in global_feats if isinstance(f, capa.features.common.OS)] - - input_format = ( - str(extractor_format[0]) if extractor_format else "unknown" if input_format == FORMAT_AUTO else input_format - ) - arch = str(extractor_arch[0]) if extractor_arch else "unknown" - os_ = str(extractor_os[0]) if extractor_os else "unknown" if os_ == OS_AUTO else os_ - - if isinstance(extractor, StaticFeatureExtractor): - meta_class: type = rdoc.StaticMetadata - elif isinstance(extractor, DynamicFeatureExtractor): - meta_class = rdoc.DynamicMetadata - else: - assert_never(extractor) - - rules = tuple(r.resolve().absolute().as_posix() for r in rules_path) - - return meta_class( - timestamp=datetime.datetime.now(), - version=capa.version.__version__, - argv=tuple(argv) if argv else None, - sample=rdoc.Sample( - md5=md5, - sha1=sha1, - sha256=sha256, - path=input_path.resolve().as_posix(), - ), - analysis=get_sample_analysis( - input_format, - arch, - os_, - extractor, - rules, - counts, - ), - ) - - -def compute_dynamic_layout(rules, extractor: DynamicFeatureExtractor, capabilities: MatchResults) -> rdoc.DynamicLayout: - """ - compute a metadata structure that links threads - to the processes in which they're found. - - only collect the threads at which some rule matched. - otherwise, we may pollute the json document with - a large amount of un-referenced data. - """ - assert isinstance(extractor, DynamicFeatureExtractor) - - matched_calls: Set[Address] = set() - - def result_rec(result: capa.features.common.Result): - for loc in result.locations: - if isinstance(loc, capa.features.address.DynamicCallAddress): - matched_calls.add(loc) - for child in result.children: - result_rec(child) - - for matches in capabilities.values(): - for _, result in matches: - result_rec(result) - - names_by_process: Dict[Address, str] = {} - names_by_call: Dict[Address, str] = {} - - matched_processes: Set[Address] = set() - matched_threads: Set[Address] = set() - - threads_by_process: Dict[Address, List[Address]] = {} - calls_by_thread: Dict[Address, List[Address]] = {} - - for p in extractor.get_processes(): - threads_by_process[p.address] = [] - - for t in extractor.get_threads(p): - calls_by_thread[t.address] = [] - - for c in extractor.get_calls(p, t): - if c.address in matched_calls: - names_by_call[c.address] = extractor.get_call_name(p, t, c) - calls_by_thread[t.address].append(c.address) - - if calls_by_thread[t.address]: - matched_threads.add(t.address) - threads_by_process[p.address].append(t.address) - - if threads_by_process[p.address]: - matched_processes.add(p.address) - names_by_process[p.address] = extractor.get_process_name(p) - - layout = rdoc.DynamicLayout( - processes=tuple( - rdoc.ProcessLayout( - address=frz.Address.from_capa(p), - name=names_by_process[p], - matched_threads=tuple( - rdoc.ThreadLayout( - address=frz.Address.from_capa(t), - matched_calls=tuple( - rdoc.CallLayout( - address=frz.Address.from_capa(c), - name=names_by_call[c], - ) - for c in calls_by_thread[t] - if c in matched_calls - ), - ) - for t in threads - if t in matched_threads - ) # this object is open to extension in the future, - # such as with the function name, etc. - ) - for p, threads in threads_by_process.items() - if p in matched_processes - ) - ) - - return layout - - -def compute_static_layout(rules, extractor: StaticFeatureExtractor, capabilities) -> rdoc.StaticLayout: - """ - compute a metadata structure that links basic blocks - to the functions in which they're found. - - only collect the basic blocks at which some rule matched. - otherwise, we may pollute the json document with - a large amount of un-referenced data. - """ - functions_by_bb: Dict[Address, Address] = {} - bbs_by_function: Dict[Address, List[Address]] = {} - for f in extractor.get_functions(): - bbs_by_function[f.address] = [] - for bb in extractor.get_basic_blocks(f): - functions_by_bb[bb.address] = f.address - bbs_by_function[f.address].append(bb.address) - - matched_bbs = set() - for rule_name, matches in capabilities.items(): - rule = rules[rule_name] - if capa.rules.Scope.BASIC_BLOCK in rule.scopes: - for addr, _ in matches: - assert addr in functions_by_bb - matched_bbs.add(addr) - - layout = rdoc.StaticLayout( - functions=tuple( - rdoc.FunctionLayout( - address=frz.Address.from_capa(f), - matched_basic_blocks=tuple( - rdoc.BasicBlockLayout(address=frz.Address.from_capa(bb)) for bb in bbs if bb in matched_bbs - ) # this object is open to extension in the future, - # such as with the function name, etc. - ) - for f, bbs in bbs_by_function.items() - if len([bb for bb in bbs if bb in matched_bbs]) > 0 - ) - ) - - return layout - - -def compute_layout(rules, extractor, capabilities) -> rdoc.Layout: - if isinstance(extractor, StaticFeatureExtractor): - return compute_static_layout(rules, extractor, capabilities) - elif isinstance(extractor, DynamicFeatureExtractor): - return compute_dynamic_layout(rules, extractor, capabilities) - else: - raise ValueError("extractor must be either a static or dynamic extracotr") - - def simple_message_exception_handler(exctype, value: BaseException, traceback: TracebackType): """ prints friendly message on unexpected exceptions to regular users (debug mode shows regular stack trace) @@ -978,10 +485,10 @@ def get_input_format_from_args(args) -> str: raises: ShouldExitError: if the program is invoked incorrectly and should exit. """ - format = args.format + format_ = args.format - if format != FORMAT_AUTO: - return format + if format_ != FORMAT_AUTO: + return format_ try: return get_auto_format(args.input_file) @@ -1057,7 +564,7 @@ def get_os_from_args(args, backend) -> str: sample_path = get_sample_path_from_args(args, backend) if sample_path is None: return "unknown" - return get_os(sample_path) + return capa.loader.get_os(sample_path) def get_rules_from_args(args) -> RuleSet: @@ -1069,7 +576,7 @@ def get_rules_from_args(args) -> RuleSet: ShouldExitError: if the program is invoked incorrectly and should exit. """ try: - if is_running_standalone() and args.is_default_rules: + if capa.helpers.is_running_standalone() and args.is_default_rules: cache_dir = get_default_root() / "cache" else: cache_dir = capa.rules.cache.get_default_cache_directory() @@ -1125,7 +632,7 @@ def get_file_extractors_from_args(args, input_format: str) -> List[FeatureExtrac # this pass can inspect multiple file extractors, e.g., dotnet and pe to identify # various limitations try: - return get_file_extractors(args.input_file, input_format) + return capa.loader.get_file_extractors(args.input_file, input_format) except PEFormatError as e: logger.error("Input file '%s' is not a valid PE file: %s", args.input_file, str(e)) raise ShouldExitError(E_CORRUPT_FILE) from e @@ -1207,7 +714,7 @@ def get_extractor_from_args(args, input_format: str, backend: str) -> FeatureExt elif input_format != FORMAT_PE: logger.debug("skipping library code matching: signatures only supports PE files") else: - sig_paths = get_signatures(args.signatures) + sig_paths = capa.loader.get_signatures(args.signatures) except IOError as e: logger.error("%s", str(e)) raise ShouldExitError(E_INVALID_SIG) from e @@ -1221,7 +728,7 @@ def get_extractor_from_args(args, input_format: str, backend: str) -> FeatureExt # see same code and show-features above examples # https://github.com/mandiant/capa/issues/1813 try: - return get_extractor( + return capa.loader.get_extractor( args.input_file, input_format, os_, @@ -1317,15 +824,15 @@ def main(argv: Optional[List[str]] = None): if sample_path is None: os_ = "unknown" else: - os_ = get_os(sample_path) + os_ = capa.loader.get_os(sample_path) extractor = get_extractor_from_args(args, input_format, backend) except ShouldExitError as e: return e.status_code capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet) - meta = collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts) - meta.analysis.layout = compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if isinstance(extractor, StaticFeatureExtractor) and found_file_limitation: # bail if capa's static feature extractor encountered file limitation e.g. a packed binary diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 0f6422c18..82c511c25 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -110,7 +110,7 @@ def get_capa_results(args): should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) logger.info("computing capa results for: %s", path) try: - extractor = capa.main.get_extractor( + extractor = capa.loader.get_extractor( path, format, os_, capa.main.BACKEND_VIV, sigpaths, should_save_workspace, disable_progress=True ) except capa.exceptions.UnsupportedFormatError: @@ -139,8 +139,8 @@ def get_capa_results(args): capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) - meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata([], path, format, os_, [], extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) doc = rd.ResultDocument.from_capa(meta, rules, capabilities) return {"path": path, "status": "ok", "ok": doc.model_dump()} @@ -168,7 +168,7 @@ def main(argv=None): return -1 try: - sig_paths = capa.main.get_signatures(args.signatures) + sig_paths = capa.loader.get_signatures(args.signatures) except IOError as e: logger.error("%s", str(e)) return -1 diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index e6b8bf429..a3a160784 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -173,14 +173,14 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): rules = capa.rules.get_rules([rules_path]) # extract features and find capabilities - extractor = capa.main.get_extractor( + extractor = capa.loader.get_extractor( file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True ) capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) - meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) capa_output: Any = False diff --git a/scripts/lint.py b/scripts/lint.py index b24aa1349..4eb05f289 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -363,7 +363,7 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: format_ = capa.helpers.get_auto_format(nice_path) logger.debug("analyzing sample: %s", nice_path) - extractor = capa.main.get_extractor( + extractor = capa.loader.get_extractor( nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True ) diff --git a/scripts/profile-time.py b/scripts/profile-time.py index f9615cba6..d67ba5385 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -97,7 +97,7 @@ def main(argv=None): return -1 try: - sig_paths = capa.main.get_signatures(args.signatures) + sig_paths = capa.loader.get_signatures(args.signatures) except IOError as e: logger.error("%s", str(e)) return -1 @@ -107,7 +107,7 @@ def main(argv=None): ): extractor = capa.features.freeze.load(Path(args.sample).read_bytes()) else: - extractor = capa.main.get_extractor( + extractor = capa.loader.get_extractor( args.sample, args.format, args.os, capa.main.BACKEND_VIV, sig_paths, should_save_workspace=False ) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index c09797ec2..22ebd1e55 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -163,7 +163,7 @@ def main(argv=None): return -1 try: - sig_paths = capa.main.get_signatures(args.signatures) + sig_paths = capa.loader.get_signatures(args.signatures) except IOError as e: logger.error("%s", str(e)) return -1 @@ -176,7 +176,7 @@ def main(argv=None): should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: - extractor = capa.main.get_extractor( + extractor = capa.loader.get_extractor( args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace ) assert isinstance(extractor, StaticFeatureExtractor) @@ -189,8 +189,8 @@ def main(argv=None): capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) - meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) - meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) + meta = capa.loader.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) + meta.analysis.layout = capa.loader.compute_layout(rules, extractor, capabilities) if capa.capabilities.common.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary diff --git a/scripts/show-features.py b/scripts/show-features.py index 2d5a34808..b448efe6c 100644 --- a/scripts/show-features.py +++ b/scripts/show-features.py @@ -74,6 +74,7 @@ import capa.main import capa.rules import capa.engine +import capa.loader import capa.helpers import capa.features import capa.exceptions @@ -124,7 +125,7 @@ def main(argv=None): return -1 try: - sig_paths = capa.main.get_signatures(args.signatures) + sig_paths = capa.loader.get_signatures(args.signatures) except IOError as e: logger.error("%s", str(e)) return -1 @@ -137,7 +138,7 @@ def main(argv=None): else: should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: - extractor = capa.main.get_extractor( + extractor = capa.loader.get_extractor( args.sample, format_, args.os, args.backend, sig_paths, should_save_workspace ) except capa.exceptions.UnsupportedFormatError as e: diff --git a/scripts/show-unused-features.py b/scripts/show-unused-features.py index b030995c3..b045f2613 100644 --- a/scripts/show-unused-features.py +++ b/scripts/show-unused-features.py @@ -123,7 +123,7 @@ def main(argv=None): return -1 try: - sig_paths = capa.main.get_signatures(args.signatures) + sig_paths = capa.loader.get_signatures(args.signatures) except IOError as e: logger.error("%s", str(e)) return -1 @@ -135,7 +135,7 @@ def main(argv=None): else: should_save_workspace = os.environ.get("CAPA_SAVE_WORKSPACE") not in ("0", "no", "NO", "n", None) try: - extractor = capa.main.get_extractor( + extractor = capa.loader.get_extractor( args.sample, args.format, args.os, args.backend, sig_paths, should_save_workspace ) except capa.exceptions.UnsupportedFormatError: diff --git a/tests/fixtures.py b/tests/fixtures.py index a06308a1c..ebfe557a5 100644 --- a/tests/fixtures.py +++ b/tests/fixtures.py @@ -106,11 +106,11 @@ def get_viv_extractor(path: Path): ] if "raw32" in path.name: - vw = capa.main.get_workspace(path, "sc32", sigpaths=sigpaths) + vw = capa.loader.get_workspace(path, "sc32", sigpaths=sigpaths) elif "raw64" in path.name: - vw = capa.main.get_workspace(path, "sc64", sigpaths=sigpaths) + vw = capa.loader.get_workspace(path, "sc64", sigpaths=sigpaths) else: - vw = capa.main.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths) + vw = capa.loader.get_workspace(path, FORMAT_AUTO, sigpaths=sigpaths) vw.saveWorkspace() extractor = capa.features.extractors.viv.extractor.VivisectFeatureExtractor(vw, path, OS_AUTO) fixup_viv(path, extractor)