From d2c722205c347e6b20f26ce76018c1c9f6fb6033 Mon Sep 17 00:00:00 2001 From: Samuel Hassine Date: Thu, 16 Jun 2022 08:50:43 +0200 Subject: [PATCH] [mandiant] Catch exception in intel generation --- external-import/mandiant/src/mandiant.py | 289 +++++++++++++---------- 1 file changed, 158 insertions(+), 131 deletions(-) diff --git a/external-import/mandiant/src/mandiant.py b/external-import/mandiant/src/mandiant.py index 533d2acdd4..eb91a0bc73 100644 --- a/external-import/mandiant/src/mandiant.py +++ b/external-import/mandiant/src/mandiant.py @@ -159,35 +159,42 @@ def _import_actor(self, work_id, current_state): if result is not None and len(result["threat-actors"]) > 0: actors = [] for actor in result["threat-actors"]: - if self.mandiant_threat_actor_as_intrusion_set: - stix_actor = stix2.IntrusionSet( - id=actor["id"].replace("threat-actor", "intrusion-set"), - name=self._redacted_as_none("name", actor), - description=self._redacted_as_none("description", actor), - modified=self._redacted_as_none("last_updated", actor), - aliases=self._redacted_as_none("aliases", actor), - confidence=self.helper.connect_confidence_level, - created_by_ref=self.identity["standard_id"], - object_marking_refs=[ - stix2.TLP_AMBER.get("id"), - self.marking["standard_id"], - ], - ) - else: - stix_actor = stix2.ThreatActor( - id=actor["id"], - name=self._redacted_as_none("name", actor), - description=self._redacted_as_none("description", actor), - modified=self._redacted_as_none("last_updated", actor), - aliases=self._redacted_as_none("aliases", actor), - confidence=self.helper.connect_confidence_level, - created_by_ref=self.identity["standard_id"], - object_marking_refs=[ - stix2.TLP_AMBER.get("id"), - self.marking["standard_id"], - ], - ) - actors.append(stix_actor) + try: + if self.mandiant_threat_actor_as_intrusion_set: + stix_actor = stix2.IntrusionSet( + id=actor["id"].replace("threat-actor", "intrusion-set"), + name=self._redacted_as_none("name", actor), + description=self._redacted_as_none( + "description", actor + ), + modified=self._redacted_as_none("last_updated", actor), + aliases=self._redacted_as_none("aliases", actor), + confidence=self.helper.connect_confidence_level, + created_by_ref=self.identity["standard_id"], + object_marking_refs=[ + stix2.TLP_AMBER.get("id"), + self.marking["standard_id"], + ], + ) + else: + stix_actor = stix2.ThreatActor( + id=actor["id"], + name=self._redacted_as_none("name", actor), + description=self._redacted_as_none( + "description", actor + ), + modified=self._redacted_as_none("last_updated", actor), + aliases=self._redacted_as_none("aliases", actor), + confidence=self.helper.connect_confidence_level, + created_by_ref=self.identity["standard_id"], + object_marking_refs=[ + stix2.TLP_AMBER.get("id"), + self.marking["standard_id"], + ], + ) + actors.append(stix_actor) + except Exception as e: + self.helper.log_error(str(e)) self.helper.send_stix2_bundle( stix2.Bundle( objects=actors, @@ -216,21 +223,24 @@ def _import_malware(self, work_id, current_state): if result is not None and len(result["malware"]) > 0: malwares = [] for malware in result["malware"]: - stix_malware = stix2.Malware( - id=malware["id"], - is_family=True, - name=self._redacted_as_none("name", malware), - description=self._redacted_as_none("description", malware), - modified=self._redacted_as_none("last_updated", malware), - aliases=self._redacted_as_none("aliases", malware), - confidence=self.helper.connect_confidence_level, - created_by_ref=self.identity["standard_id"], - object_marking_refs=[ - stix2.TLP_AMBER.get("id"), - self.marking["standard_id"], - ], - ) - malwares.append(stix_malware) + try: + stix_malware = stix2.Malware( + id=malware["id"], + is_family=True, + name=self._redacted_as_none("name", malware), + description=self._redacted_as_none("description", malware), + modified=self._redacted_as_none("last_updated", malware), + aliases=self._redacted_as_none("aliases", malware), + confidence=self.helper.connect_confidence_level, + created_by_ref=self.identity["standard_id"], + object_marking_refs=[ + stix2.TLP_AMBER.get("id"), + self.marking["standard_id"], + ], + ) + malwares.append(stix_malware) + except Exception as e: + self.helper.log_error(str(e)) self.helper.send_stix2_bundle( stix2.Bundle( objects=malwares, @@ -263,46 +273,51 @@ def _import_vulnerability(self, work_id, current_state): if result is not None and len(result["vulnerability"]) > 0: vulnerabilities = [] for vulnerability in result["vulnerability"]: - custom_properties = {} - if ( - "common_vulnerability_scores" in vulnerability - and "v3.1" in vulnerability["common_vulnerability_scores"] - ): - score = vulnerability["common_vulnerability_scores"]["v3.1"] - custom_properties = { - "x_opencti_base_score": self._redacted_as_none( - "base_score", score - ), - "x_opencti_attack_vector": self._redacted_as_none( - "attack_vector", score - ), - "x_opencti_integrity_impact": self._redacted_as_none( - "integrity_impact", score - ), - "x_opencti_availability_impact": self._redacted_as_none( - "availability_impact", score + try: + custom_properties = {} + if ( + "common_vulnerability_scores" in vulnerability + and "v3.1" in vulnerability["common_vulnerability_scores"] + ): + score = vulnerability["common_vulnerability_scores"]["v3.1"] + custom_properties = { + "x_opencti_base_score": self._redacted_as_none( + "base_score", score + ), + "x_opencti_attack_vector": self._redacted_as_none( + "attack_vector", score + ), + "x_opencti_integrity_impact": self._redacted_as_none( + "integrity_impact", score + ), + "x_opencti_availability_impact": self._redacted_as_none( + "availability_impact", score + ), + "x_opencti_confidentiality_impact": self._redacted_as_none( + "confidentiality_impact", score + ), + } + stix_vulnerability = stix2.Vulnerability( + id=vulnerability["id"], + name=self._redacted_as_none("cve_id", vulnerability), + description=self._redacted_as_none( + "description", vulnerability ), - "x_opencti_confidentiality_impact": self._redacted_as_none( - "confidentiality_impact", score + created=self._redacted_as_none( + "publish_date", vulnerability ), - } - stix_vulnerability = stix2.Vulnerability( - id=vulnerability["id"], - name=self._redacted_as_none("cve_id", vulnerability), - description=self._redacted_as_none( - "description", vulnerability - ), - created=self._redacted_as_none("publish_date", vulnerability), - confidence=self.helper.connect_confidence_level, - created_by_ref=self.identity["standard_id"], - object_marking_refs=[ - stix2.TLP_AMBER.get("id"), - self.marking["standard_id"], - ], - allow_custom=True, - custom_properties=custom_properties, - ) - vulnerabilities.append(stix_vulnerability) + confidence=self.helper.connect_confidence_level, + created_by_ref=self.identity["standard_id"], + object_marking_refs=[ + stix2.TLP_AMBER.get("id"), + self.marking["standard_id"], + ], + allow_custom=True, + custom_properties=custom_properties, + ) + vulnerabilities.append(stix_vulnerability) + except Exception as e: + self.helper.log_error(str(e)) self.helper.send_stix2_bundle( stix2.Bundle( objects=vulnerabilities, @@ -336,55 +351,67 @@ def _import_indicator(self, work_id, current_state): if result is not None and len(result["indicators"]) > 0: indicators = [] for indicator in result["indicators"]: - pattern = None - type = None - if indicator["type"] == "ipv4": - pattern = "[ipv4-addr:value = '" + indicator["value"] + "']" - type = "IPv4-Addr" - elif indicator["type"] == "ipv6": - pattern = "[ipv6-addr:value = '" + indicator["value"] + "']" - type = "IPv6-Addr" - elif indicator["type"] == "fqdn": - pattern = "[domain-name:value = '" + indicator["value"] + "']" - type = "Domain-Name" - elif indicator["type"] == "url": - pattern = "[url:value = '" + indicator["value"] + "']" - type = "Url" - elif indicator["type"] == "md5": - pattern = "[file:hashes.MD5 = '" + indicator["value"] + "']" - type = "File" - elif indicator["type"] == "sha1": - pattern = "[file:hashes.SHA-1 = '" + indicator["value"] + "']" - type = "File" - elif indicator["type"] == "sha-256": - pattern = "[file:hashes.SHA-256 = '" + indicator["value"] + "']" - type = "File" - if pattern is not None: - stix_indicator = stix2.Indicator( - id=Indicator.generate_id(pattern), - pattern=pattern, - pattern_type="stix", - allow_custom=True, - name=self._redacted_as_none("value", indicator) - if self._redacted_as_none("value", indicator) is not None - else pattern, - description=self._redacted_as_none( - "description", indicator - ), - created=self._redacted_as_none("first_seen", indicator), - modified=self._redacted_as_none("last_updated", indicator), - confidence=self.helper.connect_confidence_level, - created_by_ref=self.identity["standard_id"], - object_marking_refs=[ - stix2.TLP_AMBER.get("id"), - self.marking["standard_id"], - ], - custom_properties={ - "x_opencti_main_observable_type": type, - "x_opencti_create_observables": True, - }, - ) - indicators.append(stix_indicator) + try: + pattern = None + type = None + if indicator["type"] == "ipv4": + pattern = "[ipv4-addr:value = '" + indicator["value"] + "']" + type = "IPv4-Addr" + elif indicator["type"] == "ipv6": + pattern = "[ipv6-addr:value = '" + indicator["value"] + "']" + type = "IPv6-Addr" + elif indicator["type"] == "fqdn": + pattern = ( + "[domain-name:value = '" + indicator["value"] + "']" + ) + type = "Domain-Name" + elif indicator["type"] == "url": + pattern = "[url:value = '" + indicator["value"] + "']" + type = "Url" + elif indicator["type"] == "md5": + pattern = "[file:hashes.MD5 = '" + indicator["value"] + "']" + type = "File" + elif indicator["type"] == "sha1": + pattern = ( + "[file:hashes.SHA-1 = '" + indicator["value"] + "']" + ) + type = "File" + elif indicator["type"] == "sha-256": + pattern = ( + "[file:hashes.SHA-256 = '" + indicator["value"] + "']" + ) + type = "File" + if pattern is not None: + stix_indicator = stix2.Indicator( + id=Indicator.generate_id(pattern), + pattern=pattern, + pattern_type="stix", + allow_custom=True, + name=self._redacted_as_none("value", indicator) + if self._redacted_as_none("value", indicator) + is not None + else pattern, + description=self._redacted_as_none( + "description", indicator + ), + created=self._redacted_as_none("first_seen", indicator), + modified=self._redacted_as_none( + "last_updated", indicator + ), + confidence=self.helper.connect_confidence_level, + created_by_ref=self.identity["standard_id"], + object_marking_refs=[ + stix2.TLP_AMBER.get("id"), + self.marking["standard_id"], + ], + custom_properties={ + "x_opencti_main_observable_type": type, + "x_opencti_create_observables": True, + }, + ) + indicators.append(stix_indicator) + except Exception as e: + self.helper.log_error(str(e)) self.helper.send_stix2_bundle( stix2.Bundle( objects=indicators,