Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to select which functions or processes you which to extract capabilities from #2156

Merged
merged 46 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
38c6623
initial commit
yelhamer Jun 19, 2024
154afe1
test_capabilities.py: add tests
yelhamer Jun 19, 2024
1ae174b
CHANGELOG.md: update changelog
yelhamer Jun 19, 2024
acd69a3
usage.md: updated documentation
yelhamer Jun 19, 2024
3aaae2e
main.py: use input_format instead of file_extractors to determine ana…
yelhamer Jun 19, 2024
f7c43e9
fix linting
yelhamer Jun 19, 2024
b7e345d
apply flake8 suggestions
yelhamer Jun 19, 2024
8c8321b
main.py: Use Optional typehint
yelhamer Jun 19, 2024
1642e7e
main.py: bugfix for return instead of raise
yelhamer Jun 19, 2024
090ade5
main.py: add errorcode for invalid input format
yelhamer Jun 19, 2024
8e8e0ec
Function/Process filtering: use a function to filter
yelhamer Jun 20, 2024
1d52600
Function/Process filtering: ignore mypy errors for method reassignment
yelhamer Jun 20, 2024
d78272f
function/proc filtering tests: use a copy of the extractor in order t…
yelhamer Jun 20, 2024
e3071f8
Extractor Filters: wrap classes and overwrite __class__ instead of us…
yelhamer Jun 21, 2024
c2058bf
Extractor Filters: fix mypy errors
yelhamer Jun 21, 2024
c54bafc
function/proc filtering: overwrite __instancecheck__() for extractor …
yelhamer Jun 21, 2024
fe9f332
base_extractor: update FeatureExtractor type to include filters
yelhamer Jun 21, 2024
b329f3f
capa/loader.py: update assert_never() for mypy
yelhamer Jun 21, 2024
1a79591
capa/loader.py: use tuple in isinstance() for flake8
yelhamer Jun 21, 2024
d2c19cd
Update capa/main.py
yelhamer Jul 16, 2024
f048678
Merge branch 'master' into yelhamer-filtered-scopes
yelhamer Aug 15, 2024
02ce318
process/function filtering: override extractor object method
yelhamer Aug 15, 2024
38e3ab1
function/process filtering: ignore method reassignment type errors
yelhamer Aug 15, 2024
c91580b
process/function filtering: use --restrict-to-{processes/functions} f…
yelhamer Aug 15, 2024
5dc562d
process/functions filtering: make `apply_extractor_filters()` extensible
yelhamer Aug 15, 2024
5300f4a
process/functions filtering: use list comprehension instead of map
yelhamer Aug 15, 2024
d6cf34a
process/functions filtering: use set comprehension instead of set()
yelhamer Aug 15, 2024
e4836e5
capa/main.py: fix mypy issues
yelhamer Aug 15, 2024
9243334
Merge branch 'master' into yelhamer-filtered-scopes
yelhamer Aug 19, 2024
1168996
Update CHANGELOG.md: typo
yelhamer Aug 19, 2024
2f00b7f
Update capa/main.py
yelhamer Aug 19, 2024
79f3097
Update capa/main.py
yelhamer Aug 19, 2024
9ce2a3c
Update doc/usage.md
yelhamer Aug 19, 2024
28e274f
Update doc/usage.md
yelhamer Aug 19, 2024
fa61273
update changelog
yelhamer Aug 20, 2024
0640ba9
Update capa/features/extractors/base_extractor.py
yelhamer Aug 20, 2024
b693aa0
base_extractor.py: rename variable
yelhamer Aug 20, 2024
10a26a8
base_extractor.py: update comments
yelhamer Aug 20, 2024
b0d8071
main.py: add FilterConfig type
yelhamer Aug 20, 2024
ac50103
main.py: add asserts for checking filters are not empty
yelhamer Aug 20, 2024
a194a13
main.py: remove unused Set import
yelhamer Aug 20, 2024
e80f474
main.py: move filters extractor into get_extractor_from_cli() routine
yelhamer Aug 20, 2024
88d9d67
doc/usage.md: update usage according to reviews
yelhamer Aug 20, 2024
c30a10a
Update capa/features/extractors/base_extractor.py
yelhamer Aug 20, 2024
150d6f0
Update capa/features/extractors/base_extractor.py
yelhamer Aug 20, 2024
3aefa76
Update doc/usage.md
yelhamer Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master (unreleased)

### New Features
- Add the ability to select which specific functions or processes to analyze @yelhamer
yelhamer marked this conversation as resolved.
Show resolved Hide resolved

yelhamer marked this conversation as resolved.
Show resolved Hide resolved
- webui: explore capa analysis results in a web-based UI online and offline #2224 @s-ff
- support analyzing DRAKVUF traces #2143 @yelhamer
Expand Down
12 changes: 12 additions & 0 deletions capa/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,15 @@ class UnsupportedOSError(ValueError):

class EmptyReportError(ValueError):
pass


class InvalidArgument(ValueError):
pass


class NonExistantFunctionError(ValueError):
pass


class NonExistantProcessError(ValueError):
pass
30 changes: 29 additions & 1 deletion capa/features/extractors/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
import abc
import hashlib
import dataclasses
from typing import Any, Dict, Tuple, Union, Iterator
from copy import copy
from types import MethodType
from typing import Any, Set, Dict, Tuple, Union, Iterator
from dataclasses import dataclass

# TODO(williballenthin): use typing.TypeAlias directly when Python 3.9 is deprecated
Expand Down Expand Up @@ -296,6 +298,19 @@ def extract_insn_features(
raise NotImplementedError()


def FunctionFilter(extractor: StaticFeatureExtractor, functions: Set) -> StaticFeatureExtractor:
get_functions = extractor.get_functions # fetch original get_functions()
yelhamer marked this conversation as resolved.
Show resolved Hide resolved

def filtered_get_functions(self):
yield from (f for f in get_functions() if f.address in functions)

# make a copy of the extractor before decorating the get_functions() method
new_extractor = copy(extractor)
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
new_extractor.get_functions = MethodType(filtered_get_functions, extractor) # type: ignore

return new_extractor


@dataclass
class ProcessHandle:
"""
Expand Down Expand Up @@ -467,4 +482,17 @@ def get_call_name(self, ph: ProcessHandle, th: ThreadHandle, ch: CallHandle) ->
raise NotImplementedError()


def ProcessFilter(extractor: DynamicFeatureExtractor, processes: Set) -> DynamicFeatureExtractor:
get_processes = extractor.get_processes # fetch original get_functions()

def filtered_get_processes(self):
yield from (f for f in get_processes() if f.address.pid in processes)

# make a copy of the extractor before decorating the get_processes() method
new_extractor = copy(extractor)
new_extractor.get_processes = MethodType(filtered_get_processes, extractor) # type: ignore

return new_extractor
yelhamer marked this conversation as resolved.
Show resolved Hide resolved


FeatureExtractor: TypeAlias = Union[StaticFeatureExtractor, DynamicFeatureExtractor]
73 changes: 70 additions & 3 deletions capa/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import textwrap
import contextlib
from types import TracebackType
from typing import Any, Dict, List, Optional
from typing import Any, Set, Dict, List, Optional
from pathlib import Path

import colorama
Expand Down Expand Up @@ -62,6 +62,7 @@
log_unsupported_drakvuf_report_error,
)
from capa.exceptions import (
InvalidArgument,
EmptyReportError,
UnsupportedOSError,
UnsupportedArchError,
Expand All @@ -83,9 +84,17 @@
FORMAT_FREEZE,
FORMAT_RESULT,
FORMAT_DRAKVUF,
STATIC_FORMATS,
DYNAMIC_FORMATS,
)
from capa.capabilities.common import find_capabilities, has_file_limitation, find_file_capabilities
from capa.features.extractors.base_extractor import FeatureExtractor, StaticFeatureExtractor, DynamicFeatureExtractor
from capa.features.extractors.base_extractor import (
ProcessFilter,
FunctionFilter,
FeatureExtractor,
StaticFeatureExtractor,
DynamicFeatureExtractor,
)

RULES_PATH_DEFAULT_STRING = "(embedded rules)"
SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
Expand All @@ -106,6 +115,8 @@
E_MISSING_CAPE_DYNAMIC_ANALYSIS = 22
E_EMPTY_REPORT = 23
E_UNSUPPORTED_GHIDRA_EXECUTION_MODE = 24
E_INVALID_INPUT_FORMAT = 25
E_INVALID_FEATURE_EXTRACTOR = 26

logger = logging.getLogger("capa")

Expand Down Expand Up @@ -276,6 +287,22 @@ def install_common_args(parser, wanted=None):
help=f"select backend, {backend_help}",
)

if "restrict-to-functions" in wanted:
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
parser.add_argument(
"--restrict-to-functions",
type=lambda s: s.replace(" ", "").split(","),
default=[],
help="provide a list of comma-separated functions to analyze (static analysis).",
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
)

if "restrict-to-processes" in wanted:
parser.add_argument(
"--restrict-to-processes",
type=lambda s: s.replace(" ", "").split(","),
default=[],
help="provide a list of comma-separated processes to analyze (dynamic analysis).",
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
)

if "os" in wanted:
oses = [
(OS_AUTO, "detect OS automatically - default"),
Expand Down Expand Up @@ -780,6 +807,28 @@ def get_extractor_from_cli(args, input_format: str, backend: str) -> FeatureExtr
raise ShouldExitError(E_CORRUPT_FILE) from e


def get_extractor_filters_from_cli(args, input_format) -> Dict[str, Set]:
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
if input_format in STATIC_FORMATS:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we may want to factor out the is_static_analysis into its own routine. though its not required in this PR.

if args.restrict_to_processes:
raise InvalidArgument("Cannot filter processes with static analysis.")
return {"functions": {int(addr, 0) for addr in args.restrict_to_functions}}
elif input_format in DYNAMIC_FORMATS:
if args.restrict_to_functions:
raise InvalidArgument("Cannot filter functions with dynamic analysis.")
return {"processes": {int(pid, 0) for pid in args.restrict_to_processes}}
else:
raise ShouldExitError(E_INVALID_INPUT_FORMAT)


def apply_extractor_filters(extractor: FeatureExtractor, extractor_filters: Dict[str, Set]):
if isinstance(extractor, StaticFeatureExtractor):
return FunctionFilter(extractor, extractor_filters["functions"])
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(extractor, DynamicFeatureExtractor):
return ProcessFilter(extractor, extractor_filters["processes"])
else:
raise ShouldExitError(E_INVALID_FEATURE_EXTRACTOR)


def main(argv: Optional[List[str]] = None):
if sys.version_info < (3, 8):
raise UnsupportedRuntimeError("This version of capa can only be used with Python 3.8+")
Expand Down Expand Up @@ -819,14 +868,28 @@ def main(argv: Optional[List[str]] = None):
parser = argparse.ArgumentParser(
description=desc, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter
)
install_common_args(parser, {"input_file", "format", "backend", "os", "signatures", "rules", "tag"})
install_common_args(
parser,
{
"input_file",
"format",
"backend",
"os",
"signatures",
"rules",
"tag",
"restrict-to-functions",
"restrict-to-processes",
},
)
parser.add_argument("-j", "--json", action="store_true", help="emit JSON instead of text")
args = parser.parse_args(args=argv)

try:
handle_common_args(args)
ensure_input_exists_from_cli(args)
input_format = get_input_format_from_cli(args)
extractor_filters = get_extractor_filters_from_cli(args, input_format)
rules = get_rules_from_cli(args)
file_extractors = get_file_extractors_from_cli(args, input_format)
found_file_limitation = find_file_limitations_from_cli(args, rules, file_extractors)
Expand Down Expand Up @@ -857,6 +920,10 @@ def main(argv: Optional[List[str]] = None):
except ShouldExitError as e:
return e.status_code

if any(extractor_filters.values()):
# if the user specified any extractor filters, apply them here.
extractor = apply_extractor_filters(extractor, extractor_filters)
yelhamer marked this conversation as resolved.
Show resolved Hide resolved

capabilities, counts = find_capabilities(rules, extractor, disable_progress=args.quiet)

meta = capa.loader.collect_metadata(argv, args.input_file, input_format, os_, args.rules, extractor, counts)
Expand Down
11 changes: 10 additions & 1 deletion doc/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,20 @@ Use the `-t` option to run rules with the given metadata value (see the rule fie
For example, `capa -t [email protected]` runs rules that reference Willi's email address (probably as the author), or
`capa -t communication` runs rules with the namespace `communication`.

### only analyze selected functions
Use the `--functions` option to extract capabilities from only a selected set of functions.
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
For example, `capa sample.exe --functions 0x4019C0,0x401CD0` will only extract the capabilities in the functions found at
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
addresses 0x401CD0 and 0x4019C0.

### only analyze selected processes
Use the `--processes` option to extract capabilities from only a selected set of processes.
For example, `capa report.log --processes 3888,3214,4299` will extract capabilities only from the processes 3888, 3214, and 4299.
yelhamer marked this conversation as resolved.
Show resolved Hide resolved
yelhamer marked this conversation as resolved.
Show resolved Hide resolved

### IDA Pro plugin: capa explorer
Please check out the [capa explorer documentation](/capa/ida/plugin/README.md).

### save time by reusing .viv files
Set the environment variable `CAPA_SAVE_WORKSPACE` to instruct the underlying analysis engine to
cache its intermediate results to the file system. For example, vivisect will create `.viv` files.
Subsequently, capa may run faster when reprocessing the same input file.
This is particularly useful during rule development as you repeatedly test a rule against a known sample.
This is particularly useful during rule development as you repeatedly test a rule against a known sample.
32 changes: 32 additions & 0 deletions tests/test_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import textwrap

import capa.capabilities.common
from capa.features.extractors.base_extractor import FunctionFilter


def test_match_across_scopes_file_function(z9324d_extractor):
Expand Down Expand Up @@ -174,6 +175,37 @@ def test_subscope_bb_rules(z9324d_extractor):
assert "test rule" in capabilities


def test_match_specific_functions(z9324d_extractor):
rules = capa.rules.RuleSet(
[
capa.rules.Rule.from_yaml(
textwrap.dedent(
"""
rule:
meta:
name: receive data
scopes:
static: function
dynamic: call
examples:
- 9324d1a8ae37a36ae560c37448c9705a:0x401CD0
features:
- or:
- api: recv
"""
)
)
]
)
extractor = FunctionFilter(z9324d_extractor, {0x4019C0})
capabilities, meta = capa.capabilities.common.find_capabilities(rules, extractor)
matches = capabilities["receive data"]
# test that we received only one match
assert len(matches) == 1
# and that this match is from the specified function
assert matches[0][0] == 0x4019C0


def test_byte_matching(z9324d_extractor):
rules = capa.rules.RuleSet(
[
Expand Down