Skip to content

Commit

Permalink
Added util and tests to check and convert rule schema from v1 to v2
Browse files Browse the repository at this point in the history
  • Loading branch information
slincoln-systemtwo committed Nov 5, 2024
1 parent dc792c9 commit 05b7a0c
Show file tree
Hide file tree
Showing 3 changed files with 235 additions and 39 deletions.
93 changes: 88 additions & 5 deletions sigmaiq/utils/sigmaiq/sigmaiq_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,78 @@
from typing import Union
from sigma.rule import SigmaRule
from sigma.collection import SigmaCollection
from datetime import datetime
import re


def _is_v1_schema(rule_data: dict) -> bool:
"""Check if the rule uses v1 schema patterns."""
if not isinstance(rule_data, dict):
return False

# Check date format
date_str = rule_data.get('date')
if date_str and '/' in date_str:
return True

# Check modified format
modified_str = rule_data.get('modified')
if modified_str and '/' in modified_str:
return True

# Check tags format
tags = rule_data.get('tags', [])
for tag in tags:
if any(ns in tag for ns in ['attack-', 'attack_', 'cve-', 'detection-']):
return True

# Check related field
related = rule_data.get('related', [])
for rel in related:
if rel.get('type') == 'obsoletes':
return True

return False

def _convert_to_v2_schema(rule_data: dict) -> dict:
"""Convert v1 schema rule to v2 schema."""
rule_data = rule_data.copy()

# Convert date and modified format
if 'date' in rule_data and '/' in rule_data['date']:
try:
date_obj = datetime.strptime(rule_data['date'], '%Y/%m/%d')
rule_data['date'] = date_obj.strftime('%Y-%m-%d')
except ValueError:
pass

if 'modified' in rule_data and '/' in rule_data['modified']:
try:
date_obj = datetime.strptime(rule_data['modified'], '%Y/%m/%d')
rule_data['modified'] = date_obj.strftime('%Y-%m-%d')
except ValueError:
pass

# Convert tags
if 'tags' in rule_data:
new_tags = []
for tag in rule_data['tags']:
# Convert common namespace patterns
tag = tag.replace('attack-', 'attack.')
tag = tag.replace('attack_', 'attack.')
tag = tag.replace('cve-', 'cve.')
tag = tag.replace('detection-', 'detection.')
new_tags.append(tag)
rule_data['tags'] = new_tags

# Convert related field
if 'related' in rule_data:
for rel in rule_data['related']:
if rel.get('type') == 'obsoletes':
rel['type'] = 'obsolete'

return rule_data

def create_sigma_rule_obj(sigma_rule: Union[SigmaRule, SigmaCollection, dict, str, list]):
"""Checks sigma_rule to ensure it's a SigmaRule or SigmaCollection object. It can also be a valid Sigma rule
representation in a dict or yaml str (or list of valid dicts/yaml strs) that can be used with SigmaRule class methods to
Expand All @@ -18,14 +88,27 @@ def create_sigma_rule_obj(sigma_rule: Union[SigmaRule, SigmaCollection, dict, st
:return: SigmaRule or SigmaCollection object as-is or created using SigmaRule classmethods.
"""

if isinstance(sigma_rule, SigmaRule) or isinstance(sigma_rule, SigmaCollection): # We're good
if isinstance(sigma_rule, (SigmaRule, SigmaCollection)):
return sigma_rule
if isinstance(sigma_rule, list): # Try to make collection from list of objects, recursively
if isinstance(sigma_rule, list):
return SigmaCollection([create_sigma_rule_obj(s) for s in sigma_rule])
if isinstance(sigma_rule, dict): # Create one from dict
if isinstance(sigma_rule, dict):
# Check and convert v1 schema if needed
if _is_v1_schema(sigma_rule):

sigma_rule = _convert_to_v2_schema(sigma_rule)
return SigmaRule.from_dict(sigma_rule)
if isinstance(sigma_rule, str): # from YAML str
return SigmaRule.from_yaml(sigma_rule)
if isinstance(sigma_rule, str):
# For YAML strings, we need to parse to dict first
try:
import yaml
rule_dict = yaml.safe_load(sigma_rule)
if _is_v1_schema(rule_dict):
rule_dict = _convert_to_v2_schema(rule_dict)
return SigmaRule.from_dict(rule_dict)
except Exception as e:
print(e)
return SigmaRule.from_yaml(sigma_rule)
raise TypeError(
f"Invalid type '{type(sigma_rule)}' for `sigma_rule`. "
f"Use a SigmaRule, SigmaCollection, dict, str, or list of these types instead."
Expand Down
53 changes: 27 additions & 26 deletions tests/test_backend_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,32 +41,33 @@ def sigma_rule():

@pytest.fixture
def sigma_rule_dict():
"""Fixture for a PySigma SigmaRule object converted to a dict"""
return SigmaRule.from_yaml(
"""
title: Test Rule
id: 12345678-abcd-abcd-1234-1234567890ab
status: test
description: A Test Sigma Rule
author: AttackIQ
date: 2023-01-01
modified: 2023-01-02
tags:
- attack.t1003
- attack.t1003.001
- attack.credential_access
logsource:
category: process_creation
product: windows
detection:
sel:
CommandLine: valueA
condition: sel
falsepositives:
- None
level: high
"""
).to_dict()
"""Fixture for a basic Sigma rule dict"""
return {
"title": "Test Rule",
"id": "12345678-abcd-abcd-1234-1234567890ab",
"status": "test",
"description": "A Test Sigma Rule",
"author": "AttackIQ",
"date": "2023-01-01",
"modified": "2023-01-02",
"tags": [
"attack.t1003",
"attack.t1003.001",
"attack.credential_access"
],
"logsource": {
"category": "process_creation",
"product": "windows"
},
"detection": {
"sel": {
"CommandLine": "valueA"
},
"condition": "sel"
},
"falsepositives": ["None"],
"level": "high"
}


@pytest.fixture
Expand Down
128 changes: 120 additions & 8 deletions tests/test_sigmaiq_utils.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,158 @@
import datetime
import pytest
from sigmaiq.utils.sigmaiq.sigmaiq_utils import create_sigma_rule_obj
import yaml
from sigmaiq.utils.sigmaiq.sigmaiq_utils import (create_sigma_rule_obj,
_is_v1_schema,
_convert_to_v2_schema)

# Fixtures
# Existing fixtures
from tests.test_backend_factory import sigma_rule, sigma_rule_yaml_str, sigma_rule_dict, sigma_collection
from sigma.rule import SigmaRule
from sigma.collection import SigmaCollection

# New fixtures for schema conversion tests
@pytest.fixture
def v1_rule_data():
return {
"id": "12345678-abcd-abcd-1234-1234567890ab",
"title": "Test Rule",
"date": "2023/04/15",
"tags": [
"attack.execution",
"attack_persistence",
"cve.2023.1234",
"detection.threat_hunting"
],
"related": [
{"type": "obsoletes", "id": "12345678-abcd-abcd-1234-1234567890ab"}
],
"modified": "2023/04/15",
"logsource": {"category": "process_creation", "product": "windows"},
"detection": {
"selection_img": {"Image|endswith": "\\regedit.exe", "OriginalFileName": "REGEDIT.EXE"},
"condition": "all of selection_* and not all of filter_*"
}
}

@pytest.fixture
def v2_rule_data():
return {
"id": "12345678-abcd-abcd-1234-1234567890ab",
"title": "Test Rule",
"date": "2023-04-15",
"tags": [
"attack.execution",
"attack.persistence",
"cve.2023.1234",
"detection.threat_hunting"
],
"related": [
{"type": "obsolete", "id": "12345678-abcd-abcd-1234-1234567890ab"}
],
"logsource": {"category": "process_creation", "product": "windows"},
"detection": {
"selection_img": {"Image|endswith": "\\regedit.exe", "OriginalFileName": "REGEDIT.EXE"},
"condition": "all of selection_* and not all of filter_*"
}
}

# Existing tests
def test_create_sigma_rule_obj_sigma_rule(sigma_rule):
"""Tests creating a SigmaRule object from a SigmaRule, aka just return the rule"""
sigma_rule = sigma_rule
print(type(sigma_rule))
assert isinstance(create_sigma_rule_obj(sigma_rule), SigmaRule)


def test_create_sigma_rule_obj_sigma_collection(sigma_collection):
"""Tests creating a SigmaRule object from a SigmaCollection, aka just return the collection"""
assert isinstance(create_sigma_rule_obj(sigma_collection), SigmaCollection)


def test_create_sigma_rule_obj_sigma_rule_yaml_str(sigma_rule_yaml_str):
"""Tests creating a SigmaRule object from a valid SigmaRule YAML str"""
assert isinstance(create_sigma_rule_obj(sigma_rule_yaml_str), SigmaRule)


def test_create_sigma_rule_obj_sigma_rule_dict(sigma_rule_dict):
"""Tests creating a SigmaRule object from a valid SigmaRule dict"""
assert isinstance(create_sigma_rule_obj(sigma_rule_dict), SigmaRule)


def test_create_sigma_rule_obj_invalid_type():
"""Tests creating a SigmaRule object from an invalid type"""
with pytest.raises(TypeError):
create_sigma_rule_obj(1) # Invalid type


def test_create_sigma_rule_obj_invalid_type_list():
"""Tests creating a SigmaRule object from an invalid type list"""
with pytest.raises(TypeError):
create_sigma_rule_obj([1]) # Invalid type list


def test_create_sigma_rule_objsigma_rule_list(sigma_rule, sigma_rule_yaml_str):
"""Tests creating a SigmaRule objects from a list"""
assert isinstance(create_sigma_rule_obj([sigma_rule, sigma_rule_yaml_str]), SigmaCollection)

# New schema conversion tests
class TestSchemaDetection:
"""Tests for v1 schema detection"""
def test_v1_date_detection(self):
assert _is_v1_schema({"date": "2023/04/15"})
assert not _is_v1_schema({"date": "2023-04-15"})

def test_v1_tags_detection(self):
assert _is_v1_schema({"tags": ["attack-execution"]})
assert _is_v1_schema({"tags": ["attack-persistence"]})
assert not _is_v1_schema({"tags": ["attack.execution"]})

def test_v1_related_detection(self):
assert _is_v1_schema({"related": [{"type": "obsoletes"}]})
assert not _is_v1_schema({"related": [{"type": "obsolete"}]})

def test_non_dict_input(self):
assert not _is_v1_schema(None)
assert not _is_v1_schema([])
assert not _is_v1_schema("string")

class TestSchemaConversion:
"""Tests for v1 to v2 schema conversion"""
def test_date_conversion(self, v1_rule_data, v2_rule_data):
converted = _convert_to_v2_schema(v1_rule_data)
assert converted["date"] == v2_rule_data["date"]

def test_tags_conversion(self, v1_rule_data, v2_rule_data):
converted = _convert_to_v2_schema(v1_rule_data)
assert converted["tags"] == v2_rule_data["tags"]

def test_related_conversion(self, v1_rule_data, v2_rule_data):
converted = _convert_to_v2_schema(v1_rule_data)
assert converted["related"] == v2_rule_data["related"]

def test_invalid_date_handling(self):
rule_data = {"date": "invalid/date/format"}
converted = _convert_to_v2_schema(rule_data)
assert converted["date"] == "invalid/date/format" # Should preserve invalid date

def test_missing_fields_handling(self):
rule_data = {"title": "Test Rule"} # No convertible fields
converted = _convert_to_v2_schema(rule_data)
assert converted == rule_data # Should return unchanged

class TestSchemaConversionIntegration:
"""Integration tests for schema conversion with create_sigma_rule_obj"""
def test_dict_conversion(self, v1_rule_data):
rule = create_sigma_rule_obj(v1_rule_data)
assert isinstance(rule, SigmaRule)
assert rule.date and isinstance(rule.date, datetime.date)
assert rule.tags and all("." in str(tag) for tag in rule.tags if "attack" in str(tag))

def test_yaml_string_conversion(self, v1_rule_data):
yaml_str = yaml.dump(v1_rule_data)
rule = create_sigma_rule_obj(yaml_str)
assert isinstance(rule, SigmaRule)
assert rule.date and isinstance(rule.date, datetime.date)
assert rule.tags and all("." in str(tag) for tag in rule.tags if "attack" in str(tag))

def test_list_conversion(self, v1_rule_data):
rules = create_sigma_rule_obj([v1_rule_data, v1_rule_data])
assert isinstance(rules, SigmaCollection)
assert len(rules.rules) == 2
for rule in rules.rules:
assert rule.date and isinstance(rule.date, datetime.date)
assert rule.tags and all("." in str(tag) for tag in rule.tags if "attack" in str(tag))

0 comments on commit 05b7a0c

Please sign in to comment.