diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a75ba0b1..e8c0c8a7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - implement dynamic analysis via CAPE sandbox #48 #1535 @yelhamer - add call scope #771 @yelhamer - add process scope for the dynamic analysis flavor #1517 @yelhamer -- Add thread scope for the dynamic analysis flavor #1517 @yelhamer +- add thread scope for the dynamic analysis flavor #1517 @yelhamer - ghidra: add Ghidra feature extractor and supporting code #1770 @colton-gabertan - ghidra: add entry script helping users run capa against a loaded Ghidra database #1767 @mike-hunhoff - binja: add support for forwarded exports #1646 @xusheng6 diff --git a/capa/capabilities/__init__.py b/capa/capabilities/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/capa/capabilities/common.py b/capa/capabilities/common.py new file mode 100644 index 000000000..a73f40afe --- /dev/null +++ b/capa/capabilities/common.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +import itertools +import collections +from typing import Any, Tuple + +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.features.address import NO_ADDRESS +from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor + +logger = logging.getLogger(__name__) + + +def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): + file_features: FeatureSet = collections.defaultdict(set) + + for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): + # not all file features may have virtual addresses. + # if not, then at least ensure the feature shows up in the index. + # the set of addresses will still be empty. + if va: + file_features[feature].add(va) + else: + if feature not in file_features: + file_features[feature] = set() + + logger.debug("analyzed file and extracted %d features", len(file_features)) + + file_features.update(function_features) + + _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) + return matches, len(file_features) + + +def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: + file_limitation_rules = list(filter(lambda r: r.is_file_limitation_rule(), rules.rules.values())) + + for file_limitation_rule in file_limitation_rules: + if file_limitation_rule.name not in capabilities: + continue + + logger.warning("-" * 80) + for line in file_limitation_rule.meta.get("description", "").split("\n"): + logger.warning(" %s", line) + logger.warning(" Identified via rule: %s", file_limitation_rule.name) + if is_standalone: + logger.warning(" ") + logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") + logger.warning("-" * 80) + + # bail on first file limitation + return True + + return False + + +def find_capabilities( + ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs +) -> Tuple[MatchResults, Any]: + from capa.capabilities.static import find_static_capabilities + from capa.capabilities.dynamic import find_dynamic_capabilities + + if isinstance(extractor, StaticFeatureExtractor): + # for the time being, extractors are either static or dynamic. + # Remove this assertion once that has changed + assert not isinstance(extractor, DynamicFeatureExtractor) + return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + if isinstance(extractor, DynamicFeatureExtractor): + return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) + + raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py new file mode 100644 index 000000000..23bfde4ac --- /dev/null +++ b/capa/capabilities/dynamic.py @@ -0,0 +1,198 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import CallHandle, ThreadHandle, ProcessHandle, DynamicFeatureExtractor + +logger = logging.getLogger(__name__) + + +def find_call_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given call. + + returns: tuple containing (features for call, match results for call) + """ + # all features found for the call. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this thread. + _, matches = ruleset.match(Scope.CALL, features, ch.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_thread_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given thread. + + returns: tuple containing (features for thread, match results for thread, match results for calls) + """ + # all features found within this thread, + # includes features found within calls. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for ch in extractor.get_calls(ph, th): + ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): + features[feature].add(va) + + # matches found within this thread. + _, matches = ruleset.match(Scope.THREAD, features, th.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, call_matches + + +def find_process_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given process. + + returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) + """ + # all features found within this process, + # includes features found within threads (and calls). + process_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic threads. + # might be found at different threads, thats ok. + thread_matches: MatchResults = collections.defaultdict(list) + + # matches found at the call scope. + # might be found at different calls, thats ok. + call_matches: MatchResults = collections.defaultdict(list) + + for th in extractor.get_threads(ph): + features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) + for feature, vas in features.items(): + process_features[feature].update(vas) + + for rule_name, res in tmatches.items(): + thread_matches[rule_name].extend(res) + + for rule_name, res in cmatches.items(): + call_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): + process_features[feature].add(va) + + _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) + return process_matches, thread_matches, call_matches, len(process_features) + + +def find_dynamic_capabilities( + ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_process_matches: MatchResults = collections.defaultdict(list) + all_thread_matches: MatchResults = collections.defaultdict(list) + all_call_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) + + assert isinstance(extractor, DynamicFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + processes = list(extractor.get_processes()) + + pb = pbar(processes, desc="matching", unit=" processes", leave=False) + for p in pb: + process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( + ruleset, extractor, p + ) + feature_counts.processes += ( + rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), + ) + logger.debug("analyzed %s and extracted %d features", p.address, feature_count) + + for rule_name, res in process_matches.items(): + all_process_matches[rule_name].extend(res) + for rule_name, res in thread_matches.items(): + all_thread_matches[rule_name].extend(res) + for rule_name, res in call_matches.items(): + all_call_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within process and thread scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + process_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(process_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_thread_matches.items(), + all_process_matches.items(), + all_call_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + } + + return matches, meta diff --git a/capa/capabilities/static.py b/capa/capabilities/static.py new file mode 100644 index 000000000..a522a29da --- /dev/null +++ b/capa/capabilities/static.py @@ -0,0 +1,233 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import time +import logging +import itertools +import collections +from typing import Any, Tuple + +import tqdm.contrib.logging + +import capa.perf +import capa.features.freeze as frz +import capa.render.result_document as rdoc +from capa.rules import Scope, RuleSet +from capa.engine import FeatureSet, MatchResults +from capa.helpers import redirecting_print_to_tqdm +from capa.capabilities.common import find_file_capabilities +from capa.features.extractors.base_extractor import BBHandle, InsnHandle, FunctionHandle, StaticFeatureExtractor + +logger = logging.getLogger(__name__) + + +def find_instruction_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle +) -> Tuple[FeatureSet, MatchResults]: + """ + find matches for the given rules for the given instruction. + + returns: tuple containing (features for instruction, match results for instruction) + """ + # all features found for the instruction. + features: FeatureSet = collections.defaultdict(set) + + for feature, addr in itertools.chain( + extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() + ): + features[feature].add(addr) + + # matches found at this instruction. + _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for addr, _ in res: + capa.engine.index_rule_matches(features, rule, [addr]) + + return features, matches + + +def find_basic_block_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle +) -> Tuple[FeatureSet, MatchResults, MatchResults]: + """ + find matches for the given rules within the given basic block. + + returns: tuple containing (features for basic block, match results for basic block, match results for instructions) + """ + # all features found within this basic block, + # includes features found within instructions. + features: FeatureSet = collections.defaultdict(set) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for insn in extractor.get_instructions(f, bb): + ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) + for feature, vas in ifeatures.items(): + features[feature].update(vas) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain( + extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() + ): + features[feature].add(va) + + # matches found within this basic block. + _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) + + for rule_name, res in matches.items(): + rule = ruleset[rule_name] + for va, _ in res: + capa.engine.index_rule_matches(features, rule, [va]) + + return features, matches, insn_matches + + +def find_code_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle +) -> Tuple[MatchResults, MatchResults, MatchResults, int]: + """ + find matches for the given rules within the given function. + + returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) + """ + # all features found within this function, + # includes features found within basic blocks (and instructions). + function_features: FeatureSet = collections.defaultdict(set) + + # matches found at the basic block scope. + # might be found at different basic blocks, thats ok. + bb_matches: MatchResults = collections.defaultdict(list) + + # matches found at the instruction scope. + # might be found at different instructions, thats ok. + insn_matches: MatchResults = collections.defaultdict(list) + + for bb in extractor.get_basic_blocks(fh): + features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) + for feature, vas in features.items(): + function_features[feature].update(vas) + + for rule_name, res in bmatches.items(): + bb_matches[rule_name].extend(res) + + for rule_name, res in imatches.items(): + insn_matches[rule_name].extend(res) + + for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): + function_features[feature].add(va) + + _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) + return function_matches, bb_matches, insn_matches, len(function_features) + + +def find_static_capabilities( + ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None +) -> Tuple[MatchResults, Any]: + all_function_matches: MatchResults = collections.defaultdict(list) + all_bb_matches: MatchResults = collections.defaultdict(list) + all_insn_matches: MatchResults = collections.defaultdict(list) + + feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) + library_functions: Tuple[rdoc.LibraryFunction, ...] = () + + assert isinstance(extractor, StaticFeatureExtractor) + with redirecting_print_to_tqdm(disable_progress): + with tqdm.contrib.logging.logging_redirect_tqdm(): + pbar = tqdm.tqdm + if capa.helpers.is_runtime_ghidra(): + # Ghidrathon interpreter cannot properly handle + # the TMonitor thread that is created via a monitor_interval + # > 0 + pbar.monitor_interval = 0 + if disable_progress: + # do not use tqdm to avoid unnecessary side effects when caller intends + # to disable progress completely + def pbar(s, *args, **kwargs): + return s + + functions = list(extractor.get_functions()) + n_funcs = len(functions) + + pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) + for f in pb: + t0 = time.time() + if extractor.is_library_function(f.address): + function_name = extractor.get_function_name(f.address) + logger.debug("skipping library function 0x%x (%s)", f.address, function_name) + library_functions += ( + rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), + ) + n_libs = len(library_functions) + percentage = round(100 * (n_libs / n_funcs)) + if isinstance(pb, tqdm.tqdm): + pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") + continue + + function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( + ruleset, extractor, f + ) + feature_counts.functions += ( + rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), + ) + t1 = time.time() + + match_count = sum(len(res) for res in function_matches.values()) + match_count += sum(len(res) for res in bb_matches.values()) + match_count += sum(len(res) for res in insn_matches.values()) + logger.debug( + "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", + f.address, + feature_count, + match_count, + t1 - t0, + ) + + for rule_name, res in function_matches.items(): + all_function_matches[rule_name].extend(res) + for rule_name, res in bb_matches.items(): + all_bb_matches[rule_name].extend(res) + for rule_name, res in insn_matches.items(): + all_insn_matches[rule_name].extend(res) + + # collection of features that captures the rule matches within function, BB, and instruction scopes. + # mapping from feature (matched rule) to set of addresses at which it matched. + function_and_lower_features: FeatureSet = collections.defaultdict(set) + for rule_name, results in itertools.chain( + all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() + ): + locations = {p[0] for p in results} + rule = ruleset[rule_name] + capa.engine.index_rule_matches(function_and_lower_features, rule, locations) + + all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) + feature_counts.file = feature_count + + matches = dict( + itertools.chain( + # each rule exists in exactly one scope, + # so there won't be any overlap among these following MatchResults, + # and we can merge the dictionaries naively. + all_insn_matches.items(), + all_bb_matches.items(), + all_function_matches.items(), + all_file_matches.items(), + ) + ) + + meta = { + "feature_counts": feature_counts, + "library_functions": library_functions, + } + + return matches, meta diff --git a/capa/ghidra/capa_ghidra.py b/capa/ghidra/capa_ghidra.py index 99beaffc4..70b98df56 100644 --- a/capa/ghidra/capa_ghidra.py +++ b/capa/ghidra/capa_ghidra.py @@ -19,6 +19,7 @@ import capa.rules import capa.ghidra.helpers import capa.render.default +import capa.capabilities.common import capa.features.extractors.ghidra.extractor logger = logging.getLogger("capa_ghidra") @@ -73,13 +74,13 @@ def run_headless(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, False) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, False) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=True): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=True): logger.info("capa encountered warnings during analysis") if args.json: @@ -123,13 +124,13 @@ def run_ui(): meta = capa.ghidra.helpers.collect_metadata([rules_path]) extractor = capa.features.extractors.ghidra.extractor.GhidraFeatureExtractor() - capabilities, counts = capa.main.find_capabilities(rules, extractor, True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, True) meta.analysis.feature_counts = counts["feature_counts"] meta.analysis.library_functions = counts["library_functions"] meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(rules, capabilities, is_standalone=False): logger.info("capa encountered warnings during analysis") if verbose == "vverbose": diff --git a/capa/ida/plugin/form.py b/capa/ida/plugin/form.py index bc78045e9..4e1bd572a 100644 --- a/capa/ida/plugin/form.py +++ b/capa/ida/plugin/form.py @@ -25,6 +25,7 @@ import capa.ida.helpers import capa.render.json import capa.features.common +import capa.capabilities.common import capa.render.result_document import capa.features.extractors.ida.extractor from capa.rules import Rule @@ -768,7 +769,7 @@ def slot_progress_feature_extraction(text): try: meta = capa.ida.helpers.collect_metadata([Path(settings.user[CAPA_SETTINGS_RULE_PATH])]) - capabilities, counts = capa.main.find_capabilities( + capabilities, counts = capa.capabilities.common.find_capabilities( ruleset, self.feature_extractor, disable_progress=True ) @@ -810,7 +811,7 @@ def slot_progress_feature_extraction(text): capa.ida.helpers.inform_user_ida_ui("capa encountered file type warnings during analysis") - if capa.main.has_file_limitation(ruleset, capabilities, is_standalone=False): + if capa.capabilities.common.has_file_limitation(ruleset, capabilities, is_standalone=False): capa.ida.helpers.inform_user_ida_ui("capa encountered file limitation warnings during analysis") except Exception as e: logger.exception("Failed to check for file limitations (error: %s)", e) diff --git a/capa/main.py b/capa/main.py index 642778877..47a95a577 100644 --- a/capa/main.py +++ b/capa/main.py @@ -17,16 +17,12 @@ import argparse import datetime import textwrap -import itertools import contextlib -import collections -from typing import Any, Dict, List, Tuple, Callable, Optional +from typing import Any, Dict, List, Callable, Optional from pathlib import Path import halo -import tqdm import colorama -import tqdm.contrib.logging from pefile import PEFormatError from typing_extensions import assert_never from elftools.common.exceptions import ELFError @@ -53,14 +49,13 @@ import capa.features.extractors.dotnetfile import capa.features.extractors.base_extractor import capa.features.extractors.cape.extractor -from capa.rules import Rule, Scope, RuleSet -from capa.engine import FeatureSet, MatchResults +from capa.rules import Rule, RuleSet +from capa.engine import MatchResults from capa.helpers import ( get_format, get_file_taste, get_auto_format, log_unsupported_os_error, - redirecting_print_to_tqdm, log_unsupported_arch_error, log_empty_cape_report_error, log_unsupported_format_error, @@ -88,15 +83,10 @@ FORMAT_FREEZE, FORMAT_RESULT, ) -from capa.features.address import NO_ADDRESS, Address +from capa.features.address import Address +from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities from capa.features.extractors.base_extractor import ( - BBHandle, - CallHandle, - InsnHandle, SampleHashes, - ThreadHandle, - ProcessHandle, - FunctionHandle, FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor, @@ -144,454 +134,6 @@ def set_vivisect_log_level(level): logging.getLogger("Elf").setLevel(level) -def find_instruction_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle, insn: InsnHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given instruction. - - returns: tuple containing (features for instruction, match results for instruction) - """ - # all features found for the instruction. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_insn_features(f, bb, insn), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this instruction. - _, matches = ruleset.match(Scope.INSTRUCTION, features, insn.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_basic_block_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, f: FunctionHandle, bb: BBHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given basic block. - - returns: tuple containing (features for basic block, match results for basic block, match results for instructions) - """ - # all features found within this basic block, - # includes features found within instructions. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for insn in extractor.get_instructions(f, bb): - ifeatures, imatches = find_instruction_capabilities(ruleset, extractor, f, bb, insn) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain( - extractor.extract_basic_block_features(f, bb), extractor.extract_global_features() - ): - features[feature].add(va) - - # matches found within this basic block. - _, matches = ruleset.match(Scope.BASIC_BLOCK, features, bb.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, insn_matches - - -def find_code_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, fh: FunctionHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given function. - - returns: tuple containing (match results for function, match results for basic blocks, match results for instructions, number of features) - """ - # all features found within this function, - # includes features found within basic blocks (and instructions). - function_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic block scope. - # might be found at different basic blocks, thats ok. - bb_matches: MatchResults = collections.defaultdict(list) - - # matches found at the instruction scope. - # might be found at different instructions, thats ok. - insn_matches: MatchResults = collections.defaultdict(list) - - for bb in extractor.get_basic_blocks(fh): - features, bmatches, imatches = find_basic_block_capabilities(ruleset, extractor, fh, bb) - for feature, vas in features.items(): - function_features[feature].update(vas) - - for rule_name, res in bmatches.items(): - bb_matches[rule_name].extend(res) - - for rule_name, res in imatches.items(): - insn_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_function_features(fh), extractor.extract_global_features()): - function_features[feature].add(va) - - _, function_matches = ruleset.match(Scope.FUNCTION, function_features, fh.address) - return function_matches, bb_matches, insn_matches, len(function_features) - - -def find_file_capabilities(ruleset: RuleSet, extractor: FeatureExtractor, function_features: FeatureSet): - file_features: FeatureSet = collections.defaultdict(set) - - for feature, va in itertools.chain(extractor.extract_file_features(), extractor.extract_global_features()): - # not all file features may have virtual addresses. - # if not, then at least ensure the feature shows up in the index. - # the set of addresses will still be empty. - if va: - file_features[feature].add(va) - else: - if feature not in file_features: - file_features[feature] = set() - - logger.debug("analyzed file and extracted %d features", len(file_features)) - - file_features.update(function_features) - - _, matches = ruleset.match(Scope.FILE, file_features, NO_ADDRESS) - return matches, len(file_features) - - -def find_static_capabilities( - ruleset: RuleSet, extractor: StaticFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_function_matches: MatchResults = collections.defaultdict(list) - all_bb_matches: MatchResults = collections.defaultdict(list) - all_insn_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.StaticFeatureCounts(file=0, functions=()) - library_functions: Tuple[rdoc.LibraryFunction, ...] = () - - assert isinstance(extractor, StaticFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if capa.helpers.is_runtime_ghidra(): - # Ghidrathon interpreter cannot properly handle - # the TMonitor thread that is created via a monitor_interval - # > 0 - pbar.monitor_interval = 0 - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - functions = list(extractor.get_functions()) - n_funcs = len(functions) - - pb = pbar(functions, desc="matching", unit=" functions", postfix="skipped 0 library functions", leave=False) - for f in pb: - t0 = time.time() - if extractor.is_library_function(f.address): - function_name = extractor.get_function_name(f.address) - logger.debug("skipping library function 0x%x (%s)", f.address, function_name) - library_functions += ( - rdoc.LibraryFunction(address=frz.Address.from_capa(f.address), name=function_name), - ) - n_libs = len(library_functions) - percentage = round(100 * (n_libs / n_funcs)) - if isinstance(pb, tqdm.tqdm): - pb.set_postfix_str(f"skipped {n_libs} library functions ({percentage}%)") - continue - - function_matches, bb_matches, insn_matches, feature_count = find_code_capabilities( - ruleset, extractor, f - ) - feature_counts.functions += ( - rdoc.FunctionFeatureCount(address=frz.Address.from_capa(f.address), count=feature_count), - ) - t1 = time.time() - - match_count = sum(len(res) for res in function_matches.values()) - match_count += sum(len(res) for res in bb_matches.values()) - match_count += sum(len(res) for res in insn_matches.values()) - logger.debug( - "analyzed function 0x%x and extracted %d features, %d matches in %0.02fs", - f.address, - feature_count, - match_count, - t1 - t0, - ) - - for rule_name, res in function_matches.items(): - all_function_matches[rule_name].extend(res) - for rule_name, res in bb_matches.items(): - all_bb_matches[rule_name].extend(res) - for rule_name, res in insn_matches.items(): - all_insn_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within function, BB, and instruction scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - function_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_function_matches.items(), all_bb_matches.items(), all_insn_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(function_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, function_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_insn_matches.items(), - all_bb_matches.items(), - all_function_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - "library_functions": library_functions, - } - - return matches, meta - - -def find_call_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle -) -> Tuple[FeatureSet, MatchResults]: - """ - find matches for the given rules for the given call. - - returns: tuple containing (features for call, match results for call) - """ - # all features found for the call. - features: FeatureSet = collections.defaultdict(set) - - for feature, addr in itertools.chain( - extractor.extract_call_features(ph, th, ch), extractor.extract_global_features() - ): - features[feature].add(addr) - - # matches found at this thread. - _, matches = ruleset.match(Scope.CALL, features, ch.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for addr, _ in res: - capa.engine.index_rule_matches(features, rule, [addr]) - - return features, matches - - -def find_thread_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle, th: ThreadHandle -) -> Tuple[FeatureSet, MatchResults, MatchResults]: - """ - find matches for the given rules within the given thread. - - returns: tuple containing (features for thread, match results for thread, match results for calls) - """ - # all features found within this thread, - # includes features found within calls. - features: FeatureSet = collections.defaultdict(set) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for ch in extractor.get_calls(ph, th): - ifeatures, imatches = find_call_capabilities(ruleset, extractor, ph, th, ch) - for feature, vas in ifeatures.items(): - features[feature].update(vas) - - for rule_name, res in imatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_thread_features(ph, th), extractor.extract_global_features()): - features[feature].add(va) - - # matches found within this thread. - _, matches = ruleset.match(Scope.THREAD, features, th.address) - - for rule_name, res in matches.items(): - rule = ruleset[rule_name] - for va, _ in res: - capa.engine.index_rule_matches(features, rule, [va]) - - return features, matches, call_matches - - -def find_process_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, ph: ProcessHandle -) -> Tuple[MatchResults, MatchResults, MatchResults, int]: - """ - find matches for the given rules within the given process. - - returns: tuple containing (match results for process, match results for threads, match results for calls, number of features) - """ - # all features found within this process, - # includes features found within threads (and calls). - process_features: FeatureSet = collections.defaultdict(set) - - # matches found at the basic threads. - # might be found at different threads, thats ok. - thread_matches: MatchResults = collections.defaultdict(list) - - # matches found at the call scope. - # might be found at different calls, thats ok. - call_matches: MatchResults = collections.defaultdict(list) - - for th in extractor.get_threads(ph): - features, tmatches, cmatches = find_thread_capabilities(ruleset, extractor, ph, th) - for feature, vas in features.items(): - process_features[feature].update(vas) - - for rule_name, res in tmatches.items(): - thread_matches[rule_name].extend(res) - - for rule_name, res in cmatches.items(): - call_matches[rule_name].extend(res) - - for feature, va in itertools.chain(extractor.extract_process_features(ph), extractor.extract_global_features()): - process_features[feature].add(va) - - _, process_matches = ruleset.match(Scope.PROCESS, process_features, ph.address) - return process_matches, thread_matches, call_matches, len(process_features) - - -def find_dynamic_capabilities( - ruleset: RuleSet, extractor: DynamicFeatureExtractor, disable_progress=None -) -> Tuple[MatchResults, Any]: - all_process_matches: MatchResults = collections.defaultdict(list) - all_thread_matches: MatchResults = collections.defaultdict(list) - all_call_matches: MatchResults = collections.defaultdict(list) - - feature_counts = rdoc.DynamicFeatureCounts(file=0, processes=()) - - assert isinstance(extractor, DynamicFeatureExtractor) - with redirecting_print_to_tqdm(disable_progress): - with tqdm.contrib.logging.logging_redirect_tqdm(): - pbar = tqdm.tqdm - if disable_progress: - # do not use tqdm to avoid unnecessary side effects when caller intends - # to disable progress completely - def pbar(s, *args, **kwargs): - return s - - processes = list(extractor.get_processes()) - - pb = pbar(processes, desc="matching", unit=" processes", leave=False) - for p in pb: - process_matches, thread_matches, call_matches, feature_count = find_process_capabilities( - ruleset, extractor, p - ) - feature_counts.processes += ( - rdoc.ProcessFeatureCount(address=frz.Address.from_capa(p.address), count=feature_count), - ) - logger.debug("analyzed %s and extracted %d features", p.address, feature_count) - - for rule_name, res in process_matches.items(): - all_process_matches[rule_name].extend(res) - for rule_name, res in thread_matches.items(): - all_thread_matches[rule_name].extend(res) - for rule_name, res in call_matches.items(): - all_call_matches[rule_name].extend(res) - - # collection of features that captures the rule matches within process and thread scopes. - # mapping from feature (matched rule) to set of addresses at which it matched. - process_and_lower_features: FeatureSet = collections.defaultdict(set) - for rule_name, results in itertools.chain( - all_process_matches.items(), all_thread_matches.items(), all_call_matches.items() - ): - locations = {p[0] for p in results} - rule = ruleset[rule_name] - capa.engine.index_rule_matches(process_and_lower_features, rule, locations) - - all_file_matches, feature_count = find_file_capabilities(ruleset, extractor, process_and_lower_features) - feature_counts.file = feature_count - - matches = dict( - itertools.chain( - # each rule exists in exactly one scope, - # so there won't be any overlap among these following MatchResults, - # and we can merge the dictionaries naively. - all_thread_matches.items(), - all_process_matches.items(), - all_call_matches.items(), - all_file_matches.items(), - ) - ) - - meta = { - "feature_counts": feature_counts, - } - - return matches, meta - - -def find_capabilities( - ruleset: RuleSet, extractor: FeatureExtractor, disable_progress=None, **kwargs -) -> Tuple[MatchResults, Any]: - if isinstance(extractor, StaticFeatureExtractor): - return find_static_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - elif isinstance(extractor, DynamicFeatureExtractor): - return find_dynamic_capabilities(ruleset, extractor, disable_progress=disable_progress, **kwargs) - else: - raise ValueError(f"unexpected extractor type: {extractor.__class__.__name__}") - - -def has_rule_with_namespace(rules: RuleSet, capabilities: MatchResults, namespace: str) -> bool: - return any( - rules.rules[rule_name].meta.get("namespace", "").startswith(namespace) for rule_name in capabilities.keys() - ) - - -def is_internal_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "").startswith("internal/") - - -def is_file_limitation_rule(rule: Rule) -> bool: - return rule.meta.get("namespace", "") == "internal/limitation/file" - - -def has_file_limitation(rules: RuleSet, capabilities: MatchResults, is_standalone=True) -> bool: - file_limitation_rules = list(filter(is_file_limitation_rule, rules.rules.values())) - - for file_limitation_rule in file_limitation_rules: - if file_limitation_rule.name not in capabilities: - continue - - logger.warning("-" * 80) - for line in file_limitation_rule.meta.get("description", "").split("\n"): - logger.warning(" %s", line) - logger.warning(" Identified via rule: %s", file_limitation_rule.name) - if is_standalone: - logger.warning(" ") - logger.warning(" Use -v or -vv if you really want to see the capabilities identified by capa.") - logger.warning("-" * 80) - - # bail on first file limitation - return True - - return False - - def is_supported_format(sample: Path) -> bool: """ Return if this is a supported file based on magic header values diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index e715ae863..bb6ab5a18 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -940,6 +940,9 @@ def _extract_subscope_rules_rec(self, statement): for child in statement.get_children(): yield from self._extract_subscope_rules_rec(child) + def is_file_limitation_rule(self) -> bool: + return self.meta.get("namespace", "") == "internal/limitation/file" + def is_subscope_rule(self): return bool(self.meta.get("capa/subscope-rule", False)) diff --git a/scripts/bulk-process.py b/scripts/bulk-process.py index 3e3cdfb2f..8950b8936 100644 --- a/scripts/bulk-process.py +++ b/scripts/bulk-process.py @@ -75,6 +75,7 @@ import capa.main import capa.rules import capa.render.json +import capa.capabilities.common import capa.render.result_document as rd from capa.features.common import OS_AUTO @@ -136,7 +137,7 @@ def get_capa_results(args): "error": f"unexpected error: {e}", } - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) meta = capa.main.collect_metadata([], path, format, os_, [], extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) diff --git a/scripts/capa_as_library.py b/scripts/capa_as_library.py index 7311107a9..611576908 100644 --- a/scripts/capa_as_library.py +++ b/scripts/capa_as_library.py @@ -19,6 +19,7 @@ import capa.render.json import capa.render.utils as rutils import capa.render.default +import capa.capabilities.common import capa.render.result_document as rd import capa.features.freeze.features as frzf from capa.features.common import OS_AUTO, FORMAT_AUTO @@ -175,7 +176,7 @@ def capa_details(rules_path: Path, file_path: Path, output_format="dictionary"): extractor = capa.main.get_extractor( file_path, FORMAT_AUTO, OS_AUTO, capa.main.BACKEND_VIV, [], False, disable_progress=True ) - capabilities, counts = capa.main.find_capabilities(rules, extractor, disable_progress=True) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) # collect metadata (used only to make rendering more complete) meta = capa.main.collect_metadata([], file_path, FORMAT_AUTO, OS_AUTO, [rules_path], extractor, counts) diff --git a/scripts/lint.py b/scripts/lint.py index 065e694bb..edcf9f563 100644 --- a/scripts/lint.py +++ b/scripts/lint.py @@ -41,6 +41,7 @@ import capa.engine import capa.helpers import capa.features.insn +import capa.capabilities.common from capa.rules import Rule, RuleSet from capa.features.common import OS_AUTO, String, Feature, Substring from capa.render.result_document import RuleMetadata @@ -366,7 +367,7 @@ def get_sample_capabilities(ctx: Context, path: Path) -> Set[str]: nice_path, format_, OS_AUTO, capa.main.BACKEND_VIV, DEFAULT_SIGNATURES, False, disable_progress=True ) - capabilities, _ = capa.main.find_capabilities(ctx.rules, extractor, disable_progress=True) + capabilities, _ = capa.capabilities.common.find_capabilities(ctx.rules, extractor, disable_progress=True) # mypy doesn't seem to be happy with the MatchResults type alias & set(...keys())? # so we ignore a few types here. capabilities = set(capabilities.keys()) # type: ignore diff --git a/scripts/profile-time.py b/scripts/profile-time.py index 9acd60ff4..86590a800 100644 --- a/scripts/profile-time.py +++ b/scripts/profile-time.py @@ -54,6 +54,7 @@ import capa.features import capa.features.common import capa.features.freeze +import capa.capabilities.common logger = logging.getLogger("capa.profile") @@ -114,7 +115,7 @@ def main(argv=None): def do_iteration(): capa.perf.reset() - capa.main.find_capabilities(rules, extractor, disable_progress=True) + capa.capabilities.common.find_capabilities(rules, extractor, disable_progress=True) pbar.update(1) samples = timeit.repeat(do_iteration, number=args.number, repeat=args.repeat) diff --git a/scripts/show-capabilities-by-function.py b/scripts/show-capabilities-by-function.py index 509c3a847..421c6c7e1 100644 --- a/scripts/show-capabilities-by-function.py +++ b/scripts/show-capabilities-by-function.py @@ -74,6 +74,7 @@ import capa.render.utils as rutils import capa.render.verbose import capa.features.freeze +import capa.capabilities.common import capa.render.result_document as rd from capa.helpers import get_file_taste from capa.features.common import FORMAT_AUTO @@ -186,12 +187,12 @@ def main(argv=None): capa.helpers.log_unsupported_runtime_error() return -1 - capabilities, counts = capa.main.find_capabilities(rules, extractor) + capabilities, counts = capa.capabilities.common.find_capabilities(rules, extractor) meta = capa.main.collect_metadata(argv, args.sample, format_, args.os, args.rules, extractor, counts) meta.analysis.layout = capa.main.compute_layout(rules, extractor, capabilities) - if capa.main.has_file_limitation(rules, capabilities): + if capa.capabilities.common.has_file_limitation(rules, capabilities): # bail if capa encountered file limitation e.g. a packed binary # do show the output in verbose mode, though. if not (args.verbose or args.vverbose or args.json): diff --git a/tests/test_capabilities.py b/tests/test_capabilities.py new file mode 100644 index 000000000..ddc7f6c3f --- /dev/null +++ b/tests/test_capabilities.py @@ -0,0 +1,309 @@ +# -*- coding: utf-8 -*- +# Copyright (C) 2023 Mandiant, Inc. All Rights Reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: [package root]/LICENSE.txt +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and limitations under the License. +import textwrap + +import capa.capabilities.common + + +def test_match_across_scopes_file_function(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a function (0x4073F0) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: install service + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 + features: + - and: + - api: advapi32.OpenSCManagerA + - api: advapi32.CreateServiceA + - api: advapi32.StartServiceA + """ + ) + ), + # this rule should match on a file feature + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - section: .text + """ + ) + ), + # this rule should match on earlier rule matches: + # - install service, with function scope + # - .text section, with file scope + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: .text section and install service + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - match: install service + - match: .text section + """ + ) + ), + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "install service" in capabilities + assert ".text section" in capabilities + assert ".text section and install service" in capabilities + + +def test_match_across_scopes(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + # this rule should match on a basic block (including at least 0x403685) + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: tight loop + scopes: + static: basic block + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403685 + features: + - characteristic: tight loop + """ + ) + ), + # this rule should match on a function (0x403660) + # based on API, as well as prior basic block rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread loop + scopes: + static: function + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a:0x403660 + features: + - and: + - api: kernel32.TerminateThread + - api: kernel32.CloseHandle + - match: tight loop + """ + ) + ), + # this rule should match on a file feature and a prior function rule match + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: kill thread program + scopes: + static: file + dynamic: process + examples: + - 9324d1a8ae37a36ae560c37448c9705a + features: + - and: + - section: .text + - match: kill thread loop + """ + ) + ), + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "tight loop" in capabilities + assert "kill thread loop" in capabilities + assert "kill thread program" in capabilities + + +def test_subscope_bb_rules(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: test rule + scopes: + static: function + dynamic: process + features: + - and: + - basic block: + - characteristic: tight loop + """ + ) + ) + ] + ) + # tight loop at 0x403685 + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "test rule" in capabilities + + +def test_byte_matching(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: byte match test + scopes: + static: function + dynamic: process + features: + - and: + - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "byte match test" in capabilities + + +def test_com_feature_matching(z395eb_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: initialize IWebBrowser2 + scopes: + static: basic block + dynamic: unsupported + features: + - and: + - api: ole32.CoCreateInstance + - com/class: InternetExplorer #bytes: 01 DF 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_InternetExplorer + - com/interface: IWebBrowser2 #bytes: 61 16 0C D3 AF CD D0 11 8A 3E 00 C0 4F C9 E2 6E = IID_IWebBrowser2 + """ + ) + ) + ] + ) + capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor) + assert "initialize IWebBrowser2" in capabilities + + +def test_count_bb(z9324d_extractor): + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: count bb + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - count(basic blocks): 1 or more + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "count bb" in capabilities + + +def test_instruction_scope(z9324d_extractor): + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 + namespace: test + scopes: + static: instruction + dynamic: process + features: + - and: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000" in capabilities + assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} + + +def test_instruction_subscope(z9324d_extractor): + # .text:00406F60 sub_406F60 proc near + # [...] + # .text:004071A4 68 E8 03 00 00 push 3E8h + rules = capa.rules.RuleSet( + [ + capa.rules.Rule.from_yaml( + textwrap.dedent( + """ + rule: + meta: + name: push 1000 on i386 + namespace: test + scopes: + static: function + dynamic: process + features: + - and: + - arch: i386 + - instruction: + - mnemonic: push + - number: 1000 + """ + ) + ) + ] + ) + capabilities, meta = capa.capabilities.common.find_capabilities(rules, z9324d_extractor) + assert "push 1000 on i386" in capabilities + assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} diff --git a/tests/test_main.py b/tests/test_main.py index 730ac77cf..6d588dda1 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -214,304 +214,6 @@ def test_ruleset(): assert len(rules.call_rules) == 2 -def test_match_across_scopes_file_function(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a function (0x4073F0) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: install service - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x4073F0 - features: - - and: - - api: advapi32.OpenSCManagerA - - api: advapi32.CreateServiceA - - api: advapi32.StartServiceA - """ - ) - ), - # this rule should match on a file feature - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - section: .text - """ - ) - ), - # this rule should match on earlier rule matches: - # - install service, with function scope - # - .text section, with file scope - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: .text section and install service - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - match: install service - - match: .text section - """ - ) - ), - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "install service" in capabilities - assert ".text section" in capabilities - assert ".text section and install service" in capabilities - - -def test_match_across_scopes(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - # this rule should match on a basic block (including at least 0x403685) - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: tight loop - scopes: - static: basic block - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403685 - features: - - characteristic: tight loop - """ - ) - ), - # this rule should match on a function (0x403660) - # based on API, as well as prior basic block rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread loop - scopes: - static: function - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a:0x403660 - features: - - and: - - api: kernel32.TerminateThread - - api: kernel32.CloseHandle - - match: tight loop - """ - ) - ), - # this rule should match on a file feature and a prior function rule match - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: kill thread program - scopes: - static: file - dynamic: process - examples: - - 9324d1a8ae37a36ae560c37448c9705a - features: - - and: - - section: .text - - match: kill thread loop - """ - ) - ), - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "tight loop" in capabilities - assert "kill thread loop" in capabilities - assert "kill thread program" in capabilities - - -def test_subscope_bb_rules(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: test rule - scopes: - static: function - dynamic: process - features: - - and: - - basic block: - - characteristic: tight loop - """ - ) - ) - ] - ) - # tight loop at 0x403685 - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "test rule" in capabilities - - -def test_byte_matching(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: byte match test - scopes: - static: function - dynamic: process - features: - - and: - - bytes: ED 24 9E F4 52 A9 07 47 55 8E E1 AB 30 8E 23 61 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "byte match test" in capabilities - - -def test_com_feature_matching(z395eb_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: initialize IWebBrowser2 - scopes: - static: basic block - dynamic: unsupported - features: - - and: - - api: ole32.CoCreateInstance - - com/class: InternetExplorer #bytes: 01 DF 02 00 00 00 00 00 C0 00 00 00 00 00 00 46 = CLSID_InternetExplorer - - com/interface: IWebBrowser2 #bytes: 61 16 0C D3 AF CD D0 11 8A 3E 00 C0 4F C9 E2 6E = IID_IWebBrowser2 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z395eb_extractor) - assert "initialize IWebBrowser2" in capabilities - - -def test_count_bb(z9324d_extractor): - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: count bb - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - count(basic blocks): 1 or more - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "count bb" in capabilities - - -def test_instruction_scope(z9324d_extractor): - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 - namespace: test - scopes: - static: instruction - dynamic: process - features: - - and: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "push 1000" in capabilities - assert 0x4071A4 in {result[0] for result in capabilities["push 1000"]} - - -def test_instruction_subscope(z9324d_extractor): - # .text:00406F60 sub_406F60 proc near - # [...] - # .text:004071A4 68 E8 03 00 00 push 3E8h - rules = capa.rules.RuleSet( - [ - capa.rules.Rule.from_yaml( - textwrap.dedent( - """ - rule: - meta: - name: push 1000 on i386 - namespace: test - scopes: - static: function - dynamic: process - features: - - and: - - arch: i386 - - instruction: - - mnemonic: push - - number: 1000 - """ - ) - ) - ] - ) - capabilities, meta = capa.main.find_capabilities(rules, z9324d_extractor) - assert "push 1000 on i386" in capabilities - assert 0x406F60 in {result[0] for result in capabilities["push 1000 on i386"]} - - def test_fix262(pma16_01_extractor, capsys): path = pma16_01_extractor.path assert capa.main.main([path, "-vv", "-t", "send HTTP request", "-q"]) == 0