From 144b993b12370897e1c0b014882c968296662556 Mon Sep 17 00:00:00 2001 From: izar Date: Tue, 24 Oct 2023 13:52:00 -0400 Subject: [PATCH] Issue #222: fixed Findings creating spuriois Elements --- pytm/pytm.py | 94 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 55 insertions(+), 39 deletions(-) diff --git a/pytm/pytm.py b/pytm/pytm.py index 0de9a10..e2497db 100644 --- a/pytm/pytm.py +++ b/pytm/pytm.py @@ -242,15 +242,16 @@ def __ne__(self, other): def __str__(self): return ", ".join(sorted(set(d.name for d in self))) + class varControls(var): def __set__(self, instance, value): if not isinstance(value, Controls): raise ValueError( - "expecting an Controls " - "value, got a {}".format(type(value)) + "expecting an Controls " "value, got a {}".format(type(value)) ) super().__set__(instance, value) + class Action(Enum): """Action taken when validating a threat model.""" @@ -442,18 +443,25 @@ def _apply_defaults(flows, data): e._safeset("dstPort", e.sink.port) if hasattr(e.sink.controls, "isEncrypted"): e.controls._safeset("isEncrypted", e.sink.controls.isEncrypted) - e.controls._safeset("authenticatesDestination", e.source.controls.authenticatesDestination) - e.controls._safeset("checksDestinationRevocation", e.source.controls.checksDestinationRevocation) + e.controls._safeset( + "authenticatesDestination", e.source.controls.authenticatesDestination + ) + e.controls._safeset( + "checksDestinationRevocation", e.source.controls.checksDestinationRevocation + ) for d in e.data: if d.isStored: if hasattr(e.sink.controls, "isEncryptedAtRest"): for d in e.data: - d._safeset("isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest) + d._safeset( + "isDestEncryptedAtRest", e.sink.controls.isEncryptedAtRest + ) if hasattr(e.source, "isEncryptedAtRest"): for d in e.data: d._safeset( - "isSourceEncryptedAtRest", e.source.controls.isEncryptedAtRest + "isSourceEncryptedAtRest", + e.source.controls.isEncryptedAtRest, ) if d.credentialsLife != Lifetime.NONE and not d.isCredentials: d._safeset("isCredentials", True) @@ -522,28 +530,26 @@ def _describe_classes(classes): def _list_elements(): """List all elements which can be used in a threat model with the corresponding description""" + def all_subclasses(cls): """Get all sub classes of a class""" subclasses = set(cls.__subclasses__()) - return subclasses.union( - (s for c in subclasses for s in all_subclasses(c))) + return subclasses.union((s for c in subclasses for s in all_subclasses(c))) def print_components(cls_list): elements = sorted(cls_list, key=lambda c: c.__name__) max_len = max((len(e.__name__) for e in elements)) for sc in elements: - doc = sc.__doc__ if sc.__doc__ is not None else '' - print(f'{sc.__name__:<{max_len}} -- {doc}') - #print all elements - print('Elements:') + doc = sc.__doc__ if sc.__doc__ is not None else "" + print(f"{sc.__name__:<{max_len}} -- {doc}") + + # print all elements + print("Elements:") print_components(all_subclasses(Element)) # Print Attributes - print('\nAtributes:') - print_components( - all_subclasses(OrderedEnum) | {Data, Action, Lifetime} - ) - + print("\nAtributes:") + print_components(all_subclasses(OrderedEnum) | {Data, Action, Lifetime}) def _get_elements_and_boundaries(flows): @@ -661,7 +667,7 @@ def __init__( if args: element = args[0] else: - element = kwargs.pop("element", Element("invalid")) + element = kwargs.pop("element", Element("created from Finding")) self.target = element.name self.element = element @@ -683,6 +689,7 @@ def __init__( threat_id = kwargs.get("threat_id", None) for f in element.overrides: + # we override on threat_id, which is if f.threat_id != threat_id: continue for i in dir(f.__class__): @@ -729,7 +736,14 @@ class TM: _data = [] _threatsExcluded = [] _sf = None - _duplicate_ignored_attrs = "name", "note", "order", "response", "responseTo", "controls" + _duplicate_ignored_attrs = ( + "name", + "note", + "order", + "response", + "responseTo", + "controls", + ) name = varString("", required=True, doc="Model name") description = varString("", required=True, doc="Model description") threatsFile = varString( @@ -877,7 +891,7 @@ def _check_duplicates(self, flows): left_controls_attrs = left.controls._attr_values() right_controls_attrs = right.controls._attr_values() - #for a in self._duplicate_ignored_attrs: + # for a in self._duplicate_ignored_attrs: # del left_controls_attrs[a], right_controls_attrs[a] if left_controls_attrs != right_controls_attrs: continue @@ -885,7 +899,6 @@ def _check_duplicates(self, flows): right._is_drawn = True continue - raise ValueError( "Duplicate Dataflow found between {} and {}: " "{} is same as {}".format( @@ -1087,7 +1100,6 @@ def _stale(self, days): print(f"Checking for code {days} days older than this model.") for e in TM._elements: - for src in e.sourceFiles: try: src_mtime = datetime.fromtimestamp( @@ -1164,6 +1176,7 @@ def get_table(self, db, klass): ] return db.define_table(name, fields) + class Controls: """Controls implemented by/on and Element""" @@ -1256,7 +1269,6 @@ def _attr_values(self): result[i] = value return result - def _safeset(self, attr, value): try: setattr(self, attr, value) @@ -1264,7 +1276,6 @@ def _safeset(self, attr, value): pass - class Element: """A generic element""" @@ -1303,7 +1314,8 @@ def __init__(self, name, **kwargs): self.controls = Controls() self.uuid = uuid.UUID(int=random.getrandbits(128)) self._is_drawn = False - TM._elements.append(self) + if self.name != "created from Finding": + TM._elements.append(self) def __repr__(self): return "<{0}.{1}({2}) at {3}>".format( @@ -1607,7 +1619,7 @@ class Datastore(Asset): * FILE_SYSTEM - files on a file system * SQL - A SQL Database * LDAP - An LDAP Server -* AWS_S3 - An S3 Bucket within AWS""" +* AWS_S3 - An S3 Bucket within AWS""", ) def __init__(self, name, **kwargs): @@ -1874,27 +1886,29 @@ def serialize(obj, nested=False): result[i.lstrip("_")] = value return result + def encode_element_threat_data(obj): """Used to html encode threat data from a list of Elements""" encoded_elements = [] - if (type(obj) is not list): - raise ValueError("expecting a list value, got a {}".format(type(value))) + if type(obj) is not list: + raise ValueError("expecting a list value, got a {}".format(type(obj))) for o in obj: - c = copy.deepcopy(o) - for a in o._attr_values(): - if (a == "findings"): - encoded_findings = encode_threat_data(o.findings) - c._safeset("findings", encoded_findings) + c = copy.deepcopy(o) + for a in o._attr_values(): + if a == "findings": + encoded_findings = encode_threat_data(o.findings) + c._safeset("findings", encoded_findings) else: - v = getattr(o, a) - if (type(v) is not list or (type(v) is list and len(v) != 0)): - c._safeset(a, v) - - encoded_elements.append(c) + v = getattr(o, a) + if type(v) is not list or (type(v) is list and len(v) != 0): + c._safeset(a, v) + + encoded_elements.append(c) return encoded_elements + def encode_threat_data(obj): """Used to html encode threat data from a list of threats or findings""" encoded_threat_data = [] @@ -1955,7 +1969,9 @@ def get_args(): "--describe", help="describe the properties available for a given element" ) _parser.add_argument( - "--list-elements", action="store_true", help="list all elements which can be part of a threat model" + "--list-elements", + action="store_true", + help="list all elements which can be part of a threat model", ) _parser.add_argument("--json", help="output a JSON file") _parser.add_argument(