Skip to content

Commit

Permalink
[mandiant] Catch exception in intel generation
Browse files Browse the repository at this point in the history
  • Loading branch information
SamuelHassine committed Jun 16, 2022
1 parent a1ed484 commit d2c7222
Showing 1 changed file with 158 additions and 131 deletions.
289 changes: 158 additions & 131 deletions external-import/mandiant/src/mandiant.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,35 +159,42 @@ def _import_actor(self, work_id, current_state):
if result is not None and len(result["threat-actors"]) > 0:
actors = []
for actor in result["threat-actors"]:
if self.mandiant_threat_actor_as_intrusion_set:
stix_actor = stix2.IntrusionSet(
id=actor["id"].replace("threat-actor", "intrusion-set"),
name=self._redacted_as_none("name", actor),
description=self._redacted_as_none("description", actor),
modified=self._redacted_as_none("last_updated", actor),
aliases=self._redacted_as_none("aliases", actor),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
)
else:
stix_actor = stix2.ThreatActor(
id=actor["id"],
name=self._redacted_as_none("name", actor),
description=self._redacted_as_none("description", actor),
modified=self._redacted_as_none("last_updated", actor),
aliases=self._redacted_as_none("aliases", actor),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
)
actors.append(stix_actor)
try:
if self.mandiant_threat_actor_as_intrusion_set:
stix_actor = stix2.IntrusionSet(
id=actor["id"].replace("threat-actor", "intrusion-set"),
name=self._redacted_as_none("name", actor),
description=self._redacted_as_none(
"description", actor
),
modified=self._redacted_as_none("last_updated", actor),
aliases=self._redacted_as_none("aliases", actor),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
)
else:
stix_actor = stix2.ThreatActor(
id=actor["id"],
name=self._redacted_as_none("name", actor),
description=self._redacted_as_none(
"description", actor
),
modified=self._redacted_as_none("last_updated", actor),
aliases=self._redacted_as_none("aliases", actor),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
)
actors.append(stix_actor)
except Exception as e:
self.helper.log_error(str(e))
self.helper.send_stix2_bundle(
stix2.Bundle(
objects=actors,
Expand Down Expand Up @@ -216,21 +223,24 @@ def _import_malware(self, work_id, current_state):
if result is not None and len(result["malware"]) > 0:
malwares = []
for malware in result["malware"]:
stix_malware = stix2.Malware(
id=malware["id"],
is_family=True,
name=self._redacted_as_none("name", malware),
description=self._redacted_as_none("description", malware),
modified=self._redacted_as_none("last_updated", malware),
aliases=self._redacted_as_none("aliases", malware),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
)
malwares.append(stix_malware)
try:
stix_malware = stix2.Malware(
id=malware["id"],
is_family=True,
name=self._redacted_as_none("name", malware),
description=self._redacted_as_none("description", malware),
modified=self._redacted_as_none("last_updated", malware),
aliases=self._redacted_as_none("aliases", malware),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
)
malwares.append(stix_malware)
except Exception as e:
self.helper.log_error(str(e))
self.helper.send_stix2_bundle(
stix2.Bundle(
objects=malwares,
Expand Down Expand Up @@ -263,46 +273,51 @@ def _import_vulnerability(self, work_id, current_state):
if result is not None and len(result["vulnerability"]) > 0:
vulnerabilities = []
for vulnerability in result["vulnerability"]:
custom_properties = {}
if (
"common_vulnerability_scores" in vulnerability
and "v3.1" in vulnerability["common_vulnerability_scores"]
):
score = vulnerability["common_vulnerability_scores"]["v3.1"]
custom_properties = {
"x_opencti_base_score": self._redacted_as_none(
"base_score", score
),
"x_opencti_attack_vector": self._redacted_as_none(
"attack_vector", score
),
"x_opencti_integrity_impact": self._redacted_as_none(
"integrity_impact", score
),
"x_opencti_availability_impact": self._redacted_as_none(
"availability_impact", score
try:
custom_properties = {}
if (
"common_vulnerability_scores" in vulnerability
and "v3.1" in vulnerability["common_vulnerability_scores"]
):
score = vulnerability["common_vulnerability_scores"]["v3.1"]
custom_properties = {
"x_opencti_base_score": self._redacted_as_none(
"base_score", score
),
"x_opencti_attack_vector": self._redacted_as_none(
"attack_vector", score
),
"x_opencti_integrity_impact": self._redacted_as_none(
"integrity_impact", score
),
"x_opencti_availability_impact": self._redacted_as_none(
"availability_impact", score
),
"x_opencti_confidentiality_impact": self._redacted_as_none(
"confidentiality_impact", score
),
}
stix_vulnerability = stix2.Vulnerability(
id=vulnerability["id"],
name=self._redacted_as_none("cve_id", vulnerability),
description=self._redacted_as_none(
"description", vulnerability
),
"x_opencti_confidentiality_impact": self._redacted_as_none(
"confidentiality_impact", score
created=self._redacted_as_none(
"publish_date", vulnerability
),
}
stix_vulnerability = stix2.Vulnerability(
id=vulnerability["id"],
name=self._redacted_as_none("cve_id", vulnerability),
description=self._redacted_as_none(
"description", vulnerability
),
created=self._redacted_as_none("publish_date", vulnerability),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
allow_custom=True,
custom_properties=custom_properties,
)
vulnerabilities.append(stix_vulnerability)
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
allow_custom=True,
custom_properties=custom_properties,
)
vulnerabilities.append(stix_vulnerability)
except Exception as e:
self.helper.log_error(str(e))
self.helper.send_stix2_bundle(
stix2.Bundle(
objects=vulnerabilities,
Expand Down Expand Up @@ -336,55 +351,67 @@ def _import_indicator(self, work_id, current_state):
if result is not None and len(result["indicators"]) > 0:
indicators = []
for indicator in result["indicators"]:
pattern = None
type = None
if indicator["type"] == "ipv4":
pattern = "[ipv4-addr:value = '" + indicator["value"] + "']"
type = "IPv4-Addr"
elif indicator["type"] == "ipv6":
pattern = "[ipv6-addr:value = '" + indicator["value"] + "']"
type = "IPv6-Addr"
elif indicator["type"] == "fqdn":
pattern = "[domain-name:value = '" + indicator["value"] + "']"
type = "Domain-Name"
elif indicator["type"] == "url":
pattern = "[url:value = '" + indicator["value"] + "']"
type = "Url"
elif indicator["type"] == "md5":
pattern = "[file:hashes.MD5 = '" + indicator["value"] + "']"
type = "File"
elif indicator["type"] == "sha1":
pattern = "[file:hashes.SHA-1 = '" + indicator["value"] + "']"
type = "File"
elif indicator["type"] == "sha-256":
pattern = "[file:hashes.SHA-256 = '" + indicator["value"] + "']"
type = "File"
if pattern is not None:
stix_indicator = stix2.Indicator(
id=Indicator.generate_id(pattern),
pattern=pattern,
pattern_type="stix",
allow_custom=True,
name=self._redacted_as_none("value", indicator)
if self._redacted_as_none("value", indicator) is not None
else pattern,
description=self._redacted_as_none(
"description", indicator
),
created=self._redacted_as_none("first_seen", indicator),
modified=self._redacted_as_none("last_updated", indicator),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
custom_properties={
"x_opencti_main_observable_type": type,
"x_opencti_create_observables": True,
},
)
indicators.append(stix_indicator)
try:
pattern = None
type = None
if indicator["type"] == "ipv4":
pattern = "[ipv4-addr:value = '" + indicator["value"] + "']"
type = "IPv4-Addr"
elif indicator["type"] == "ipv6":
pattern = "[ipv6-addr:value = '" + indicator["value"] + "']"
type = "IPv6-Addr"
elif indicator["type"] == "fqdn":
pattern = (
"[domain-name:value = '" + indicator["value"] + "']"
)
type = "Domain-Name"
elif indicator["type"] == "url":
pattern = "[url:value = '" + indicator["value"] + "']"
type = "Url"
elif indicator["type"] == "md5":
pattern = "[file:hashes.MD5 = '" + indicator["value"] + "']"
type = "File"
elif indicator["type"] == "sha1":
pattern = (
"[file:hashes.SHA-1 = '" + indicator["value"] + "']"
)
type = "File"
elif indicator["type"] == "sha-256":
pattern = (
"[file:hashes.SHA-256 = '" + indicator["value"] + "']"
)
type = "File"
if pattern is not None:
stix_indicator = stix2.Indicator(
id=Indicator.generate_id(pattern),
pattern=pattern,
pattern_type="stix",
allow_custom=True,
name=self._redacted_as_none("value", indicator)
if self._redacted_as_none("value", indicator)
is not None
else pattern,
description=self._redacted_as_none(
"description", indicator
),
created=self._redacted_as_none("first_seen", indicator),
modified=self._redacted_as_none(
"last_updated", indicator
),
confidence=self.helper.connect_confidence_level,
created_by_ref=self.identity["standard_id"],
object_marking_refs=[
stix2.TLP_AMBER.get("id"),
self.marking["standard_id"],
],
custom_properties={
"x_opencti_main_observable_type": type,
"x_opencti_create_observables": True,
},
)
indicators.append(stix_indicator)
except Exception as e:
self.helper.log_error(str(e))
self.helper.send_stix2_bundle(
stix2.Bundle(
objects=indicators,
Expand Down

0 comments on commit d2c7222

Please sign in to comment.