From 7d409aed227d6faf314829f9bab99a9d90de98e1 Mon Sep 17 00:00:00 2001 From: Willi Ballenthin Date: Thu, 16 Jan 2025 15:47:01 +0000 Subject: [PATCH] pep8 --- capa/capabilities/dynamic.py | 14 ++++-------- capa/features/common.py | 4 ++-- capa/render/result_document.py | 33 +++++++++++++++++----------- capa/render/verbose.py | 1 + capa/render/vverbose.py | 4 ++-- capa/rules/__init__.py | 1 + tests/test_dynamic_sequence_scope.py | 19 ++++++++++------ 7 files changed, 42 insertions(+), 34 deletions(-) diff --git a/capa/capabilities/dynamic.py b/capa/capabilities/dynamic.py index c1d202c2b..7d3eb9d60 100644 --- a/capa/capabilities/dynamic.py +++ b/capa/capabilities/dynamic.py @@ -19,6 +19,8 @@ from dataclasses import dataclass import capa.perf +import capa.engine +import capa.helpers import capa.features.freeze as frz import capa.render.result_document as rdoc from capa.rules import Scope, RuleSet @@ -120,12 +122,6 @@ def next(self, ch: CallHandle, call_features: FeatureSet): # like arch/os/format. continue - # Don't update in place! - # - # The address sets are passed into `rule.evaluate()` by reference, - # and ultimately used to populate Result instances. - # So if we update them after Results are collected, then we can't find the locations of matches. - # self.current_features[feature] = self.current_features[feature] - vas self.current_features[feature] -= vas if not self.current_features[feature]: del self.current_features[feature] @@ -133,7 +129,6 @@ def next(self, ch: CallHandle, call_features: FeatureSet): # update the deque and set of features with the latest call's worth of features. self.current_feature_sets.append(call_features) for feature, vas in call_features.items(): - # don't update in place! self.current_features[feature] |= vas _, matches = self.ruleset.match(Scope.SEQUENCE, self.current_features, ch.address) @@ -156,7 +151,7 @@ def next(self, ch: CallHandle, call_features: FeatureSet): # see: https://github.com/mandiant/capa/pull/2532#issuecomment-2548508130 for new_rule in newly_encountered_rules: suppressed_rules -= set(self.ruleset.rules[new_rule].get_dependencies(self.ruleset.rules_by_namespace)) - + for rule_name, res in matches.items(): if rule_name in suppressed_rules: continue @@ -183,8 +178,7 @@ def find_thread_capabilities( sequence_matcher = SequenceMatcher(ruleset) call_count = 0 - for ch in extractor.get_calls(ph, th): - call_count += 1 + for call_count, ch in enumerate(extractor.get_calls(ph, th)): # noqa: B007 call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch) for feature, vas in call_capabilities.features.items(): features[feature].update(vas) diff --git a/capa/features/common.py b/capa/features/common.py index 674400a4e..44d42cceb 100644 --- a/capa/features/common.py +++ b/capa/features/common.py @@ -108,7 +108,8 @@ def __nonzero__(self): def __str__(self): # as this object isn't user facing, this formatting is just to help with debugging - lines = [] + lines: list[str] = [] + def rec(m: "Result", indent: int): if isinstance(m.statement, capa.engine.Statement): line = (" " * indent) + str(m.statement.name) + " " + str(m.success) @@ -124,7 +125,6 @@ def rec(m: "Result", indent: int): return "\n".join(lines) - class Feature(abc.ABC): # noqa: B024 # this is an abstract class, since we don't want anyone to instantiate it directly, # but it doesn't have any abstract methods. diff --git a/capa/render/result_document.py b/capa/render/result_document.py index b2b35c00d..d1bce9add 100644 --- a/capa/render/result_document.py +++ b/capa/render/result_document.py @@ -406,15 +406,18 @@ def from_capa( # like the way a function contains a basic block. # So when we have a match within a sequence for another sequence, we need to look # for all the places it might be found. - # + # # Despite the edge cases (like API hammering), this turns out to be pretty easy: # collect the most recent match (with the given name) prior to the wanted location. - matches_in_thread = sorted([ - (a.id, m) for a, m in rule_matches.items() - if isinstance(a, DynamicCallAddress) - and a.thread == location.thread - and a.id <= location.id - ]) + matches_in_thread = sorted( + [ + (a.id, m) + for a, m in rule_matches.items() + if isinstance(a, DynamicCallAddress) + and a.thread == location.thread + and a.id <= location.id + ] + ) _, most_recent_match = matches_in_thread[-1] children.append(Match.from_capa(rules, capabilities, most_recent_match)) @@ -466,12 +469,15 @@ def from_capa( if location in rule_matches: children.append(Match.from_capa(rules, capabilities, rule_matches[location])) else: - matches_in_thread = sorted([ - (a.id, m) for a, m in rule_matches.items() - if isinstance(a, DynamicCallAddress) - and a.thread == location.thread - and a.id <= location.id - ]) + matches_in_thread = sorted( + [ + (a.id, m) + for a, m in rule_matches.items() + if isinstance(a, DynamicCallAddress) + and a.thread == location.thread + and a.id <= location.id + ] + ) _, most_recent_match = matches_in_thread[-1] children.append(Match.from_capa(rules, capabilities, most_recent_match)) else: @@ -523,6 +529,7 @@ def __str__(self): # as this object isn't user facing, this formatting is just to help with debugging lines = [] + def rec(m: "Match", indent: int): if isinstance(m.node, StatementNode): line = (" " * indent) + str(m.node.statement.type) + " " + str(m.success) diff --git a/capa/render/verbose.py b/capa/render/verbose.py index 81bf93e05..a89db5039 100644 --- a/capa/render/verbose.py +++ b/capa/render/verbose.py @@ -43,6 +43,7 @@ from capa.engine import MatchResults from capa.render.utils import Console + def format_address(address: frz.Address) -> str: if address.type == frz.AddressType.ABSOLUTE: assert isinstance(address.value, int) diff --git a/capa/render/vverbose.py b/capa/render/vverbose.py index c7e376155..ad8ff4964 100644 --- a/capa/render/vverbose.py +++ b/capa/render/vverbose.py @@ -330,7 +330,7 @@ def collect_sequence_locations( yield from collect_sequence_locations(child, child_mode) elif isinstance(match.node.statement, rd.RangeStatement): for location in match.locations: - if location.type not in (frz.AddressType.CALL, ): + if location.type not in (frz.AddressType.CALL,): continue if mode == MODE_FAILURE: continue @@ -340,7 +340,7 @@ def collect_sequence_locations( yield from collect_sequence_locations(child, mode) elif isinstance(match.node, rd.FeatureNode): for location in match.locations: - if location.type not in (frz.AddressType.CALL, ): + if location.type not in (frz.AddressType.CALL,): continue if mode == MODE_FAILURE: continue diff --git a/capa/rules/__init__.py b/capa/rules/__init__.py index 2ca4e0bc6..907ae9be4 100644 --- a/capa/rules/__init__.py +++ b/capa/rules/__init__.py @@ -897,6 +897,7 @@ def rec(statement): # but, namespaces tend to use `-` while rule names use ` `. so, unlikely, but possible. if statement.value in namespaces: # matches a namespace, so take precedence and don't even check rule names. + assert isinstance(statement.value, str) deps.update(r.name for r in namespaces[statement.value]) else: # not a namespace, assume it's a rule name. diff --git a/tests/test_dynamic_sequence_scope.py b/tests/test_dynamic_sequence_scope.py index 31531d79a..b4c7b4bb4 100644 --- a/tests/test_dynamic_sequence_scope.py +++ b/tests/test_dynamic_sequence_scope.py @@ -1,11 +1,16 @@ -# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved. +# Copyright 2022 Google LLC +# # Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at: [package root]/LICENSE.txt -# Unless required by applicable law or agreed to in writing, software distributed under the License -# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and limitations under the License. - +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # tests/data/dynamic/cape/v2.2/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz #