Skip to content

Commit

Permalink
pep8
Browse files Browse the repository at this point in the history
  • Loading branch information
williballenthin committed Jan 17, 2025
1 parent 9eea3a4 commit 7d409ae
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 34 deletions.
14 changes: 4 additions & 10 deletions capa/capabilities/dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
from dataclasses import dataclass

import capa.perf
import capa.engine
import capa.helpers
import capa.features.freeze as frz
import capa.render.result_document as rdoc
from capa.rules import Scope, RuleSet
Expand Down Expand Up @@ -120,20 +122,13 @@ def next(self, ch: CallHandle, call_features: FeatureSet):
# like arch/os/format.
continue

# Don't update in place!
#
# The address sets are passed into `rule.evaluate()` by reference,
# and ultimately used to populate Result instances.
# So if we update them after Results are collected, then we can't find the locations of matches.
# self.current_features[feature] = self.current_features[feature] - vas
self.current_features[feature] -= vas
if not self.current_features[feature]:
del self.current_features[feature]

# update the deque and set of features with the latest call's worth of features.
self.current_feature_sets.append(call_features)
for feature, vas in call_features.items():
# don't update in place!
self.current_features[feature] |= vas

_, matches = self.ruleset.match(Scope.SEQUENCE, self.current_features, ch.address)
Expand All @@ -156,7 +151,7 @@ def next(self, ch: CallHandle, call_features: FeatureSet):
# see: https://github.com/mandiant/capa/pull/2532#issuecomment-2548508130
for new_rule in newly_encountered_rules:
suppressed_rules -= set(self.ruleset.rules[new_rule].get_dependencies(self.ruleset.rules_by_namespace))

for rule_name, res in matches.items():
if rule_name in suppressed_rules:
continue
Expand All @@ -183,8 +178,7 @@ def find_thread_capabilities(
sequence_matcher = SequenceMatcher(ruleset)

call_count = 0
for ch in extractor.get_calls(ph, th):
call_count += 1
for call_count, ch in enumerate(extractor.get_calls(ph, th)): # noqa: B007
call_capabilities = find_call_capabilities(ruleset, extractor, ph, th, ch)
for feature, vas in call_capabilities.features.items():
features[feature].update(vas)
Expand Down
4 changes: 2 additions & 2 deletions capa/features/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ def __nonzero__(self):
def __str__(self):
# as this object isn't user facing, this formatting is just to help with debugging

lines = []
lines: list[str] = []

def rec(m: "Result", indent: int):
if isinstance(m.statement, capa.engine.Statement):
line = (" " * indent) + str(m.statement.name) + " " + str(m.success)
Expand All @@ -124,7 +125,6 @@ def rec(m: "Result", indent: int):
return "\n".join(lines)



class Feature(abc.ABC): # noqa: B024
# this is an abstract class, since we don't want anyone to instantiate it directly,
# but it doesn't have any abstract methods.
Expand Down
33 changes: 20 additions & 13 deletions capa/render/result_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,15 +406,18 @@ def from_capa(
# like the way a function contains a basic block.
# So when we have a match within a sequence for another sequence, we need to look
# for all the places it might be found.
#
#
# Despite the edge cases (like API hammering), this turns out to be pretty easy:
# collect the most recent match (with the given name) prior to the wanted location.
matches_in_thread = sorted([
(a.id, m) for a, m in rule_matches.items()
if isinstance(a, DynamicCallAddress)
and a.thread == location.thread
and a.id <= location.id
])
matches_in_thread = sorted(
[
(a.id, m)
for a, m in rule_matches.items()
if isinstance(a, DynamicCallAddress)
and a.thread == location.thread
and a.id <= location.id
]
)
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))

Expand Down Expand Up @@ -466,12 +469,15 @@ def from_capa(
if location in rule_matches:
children.append(Match.from_capa(rules, capabilities, rule_matches[location]))
else:
matches_in_thread = sorted([
(a.id, m) for a, m in rule_matches.items()
if isinstance(a, DynamicCallAddress)
and a.thread == location.thread
and a.id <= location.id
])
matches_in_thread = sorted(
[
(a.id, m)
for a, m in rule_matches.items()
if isinstance(a, DynamicCallAddress)
and a.thread == location.thread
and a.id <= location.id
]
)
_, most_recent_match = matches_in_thread[-1]
children.append(Match.from_capa(rules, capabilities, most_recent_match))
else:
Expand Down Expand Up @@ -523,6 +529,7 @@ def __str__(self):
# as this object isn't user facing, this formatting is just to help with debugging

lines = []

def rec(m: "Match", indent: int):
if isinstance(m.node, StatementNode):
line = (" " * indent) + str(m.node.statement.type) + " " + str(m.success)
Expand Down
1 change: 1 addition & 0 deletions capa/render/verbose.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
from capa.engine import MatchResults
from capa.render.utils import Console


def format_address(address: frz.Address) -> str:
if address.type == frz.AddressType.ABSOLUTE:
assert isinstance(address.value, int)
Expand Down
4 changes: 2 additions & 2 deletions capa/render/vverbose.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def collect_sequence_locations(
yield from collect_sequence_locations(child, child_mode)
elif isinstance(match.node.statement, rd.RangeStatement):
for location in match.locations:
if location.type not in (frz.AddressType.CALL, ):
if location.type not in (frz.AddressType.CALL,):
continue
if mode == MODE_FAILURE:
continue
Expand All @@ -340,7 +340,7 @@ def collect_sequence_locations(
yield from collect_sequence_locations(child, mode)
elif isinstance(match.node, rd.FeatureNode):
for location in match.locations:
if location.type not in (frz.AddressType.CALL, ):
if location.type not in (frz.AddressType.CALL,):
continue
if mode == MODE_FAILURE:
continue
Expand Down
1 change: 1 addition & 0 deletions capa/rules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,7 @@ def rec(statement):
# but, namespaces tend to use `-` while rule names use ` `. so, unlikely, but possible.
if statement.value in namespaces:
# matches a namespace, so take precedence and don't even check rule names.
assert isinstance(statement.value, str)
deps.update(r.name for r in namespaces[statement.value])
else:
# not a namespace, assume it's a rule name.
Expand Down
19 changes: 12 additions & 7 deletions tests/test_dynamic_sequence_scope.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# Copyright (C) 2024 Mandiant, Inc. All Rights Reserved.
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at: [package root]/LICENSE.txt
# Unless required by applicable law or agreed to in writing, software distributed under the License
# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# tests/data/dynamic/cape/v2.2/0000a65749f5902c4d82ffa701198038f0b4870b00a27cfca109f8f933476d82.json.gz
#
Expand Down

0 comments on commit 7d409ae

Please sign in to comment.