Skip to content

Commit

Permalink
verify user-selected language and warn, include language version in r…
Browse files Browse the repository at this point in the history
…esults
  • Loading branch information
mr-tz committed Nov 12, 2023
1 parent b40b853 commit 801559a
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 96 deletions.
59 changes: 29 additions & 30 deletions floss/language/identify.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import re
from enum import Enum
from typing import Iterable
from typing import Tuple, Iterable
from pathlib import Path

import pefile
Expand All @@ -14,38 +14,43 @@
logger = floss.logging_.getLogger(__name__)


VERSION_UNKNOWN_OR_NA = "version unknown"


class Language(Enum):
GO = "go"
RUST = "rust"
DOTNET = "dotnet"
UNKNOWN = "unknown"
DISABLED = "none"


def identify_language(sample: Path, static_strings: Iterable[StaticString]) -> Language:
"""
Identify the language of the binary given
"""
if is_rust_bin(static_strings):
return Language.RUST
def identify_language_and_version(sample: Path, static_strings: Iterable[StaticString]) -> Tuple[Language, str]:
is_rust, version = get_if_rust_and_version(static_strings)
if is_rust:
logger.info("Rust binary found with version: %s", version)
return Language.RUST, version

# Open the file as PE for further checks
try:
pe = pefile.PE(str(sample))
except pefile.PEFormatError as err:
logger.debug(f"NOT a valid PE file: {err}")
return Language.UNKNOWN
return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA

if is_go_bin(pe):
return Language.GO
is_go, version = get_if_go_and_version(pe)
if is_go:
logger.info("Go binary found with version %s", version)
return Language.GO, version
elif is_dotnet_bin(pe):
return Language.DOTNET
return Language.DOTNET, VERSION_UNKNOWN_OR_NA
else:
return Language.UNKNOWN
return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA


def is_rust_bin(static_strings: Iterable[StaticString]) -> bool:
def get_if_rust_and_version(static_strings: Iterable[StaticString]) -> Tuple[bool, str]:
"""
Check if the binary given is compiled with Rust compiler or not
Return if the binary given is compiled with Rust compiler and its version
reference: https://github.com/mandiant/flare-floss/issues/766
"""

Expand All @@ -62,19 +67,17 @@ def is_rust_bin(static_strings: Iterable[StaticString]) -> bool:
matches = regex_hash.search(string)
if matches and matches["hash"] in rust_commit_hash.keys():
version = rust_commit_hash[matches["hash"]]
logger.info("Rust binary found with version: %s", version)
return True
return True, version
if regex_version.search(string):
logger.info("Rust binary found with version: %s", string)
return True
return True, string

return False
return False, VERSION_UNKNOWN_OR_NA


def is_go_bin(pe: pefile.PE) -> bool:
def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]:
"""
Check if the binary given is compiled with Go compiler or not
it checks the magic header of the pclntab structure -pcHeader-
Return if the binary given is compiled with Go compiler and its version
this checks the magic header of the pclntab structure -pcHeader-
the magic values varies through the version
reference:
https://github.com/0xjiayu/go_parser/blob/865359c297257e00165beb1683ef6a679edc2c7f/pclntbl.py#L46
Expand All @@ -101,11 +104,9 @@ def is_go_bin(pe: pefile.PE) -> bool:
if magic in section_data:
pclntab_va = section_data.index(magic) + section_va
if verify_pclntab(section, pclntab_va):
logger.info("Go binary found with version %s", get_go_version(magic))
return True
return True, get_go_version(magic)

# if not found, search in all the available sections

for magic in go_magic:
for section in pe.sections:
section_va = section.VirtualAddress
Expand All @@ -114,10 +115,8 @@ def is_go_bin(pe: pefile.PE) -> bool:
if magic in section_data:
pclntab_va = section_data.index(magic) + section_va
if verify_pclntab(section, pclntab_va):
# just for testing
logger.info("Go binary found with version %s", get_go_version(magic))
return True
return False
return True, get_go_version(magic)
return False, VERSION_UNKNOWN_OR_NA


def get_go_version(magic):
Expand All @@ -137,7 +136,7 @@ def get_go_version(magic):
elif magic == MAGIC_120:
return "1.20"
else:
return "unknown"
return VERSION_UNKNOWN_OR_NA


def verify_pclntab(section, pclntab_va: int) -> bool:
Expand Down
108 changes: 55 additions & 53 deletions floss/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
from floss.stackstrings import extract_stackstrings
from floss.tightstrings import extract_tightstrings
from floss.string_decoder import decode_strings
from floss.language.identify import Language, identify_language
from floss.language.identify import Language, identify_language_and_version

SIGNATURES_PATH_DEFAULT_STRING = "(embedded signatures)"
EXTENSIONS_SHELLCODE_32 = ("sc32", "raw32")
Expand Down Expand Up @@ -198,9 +198,11 @@ def make_parser(argv):
advanced_group.add_argument(
"--language",
type=str,
choices=[l.value for l in Language if l != Language.UNKNOWN] + ["none"],
default="",
help="use language-specific string extraction, disable using 'none'" if show_all_options else argparse.SUPPRESS,
choices=[l.value for l in Language if l != Language.UNKNOWN],
default=Language.UNKNOWN.value,
help="use language-specific string extraction, auto-detect language by default, disable using 'none'"
if show_all_options
else argparse.SUPPRESS,
)
advanced_group.add_argument(
"-l",
Expand Down Expand Up @@ -547,39 +549,44 @@ def main(argv=None) -> int:
static_runtime = get_runtime_diff(interim)

# set language configurations
lang_id: Language
if args.language == Language.GO.value:
lang_id = Language.GO
elif args.language == Language.RUST.value:
lang_id = Language.RUST
elif args.language == Language.DOTNET.value:
lang_id = Language.DOTNET
elif args.language == "none":
lang_id = Language.UNKNOWN
selected_lang = Language(args.language)
if selected_lang == Language.DISABLED:
results.metadata.language = ""
results.metadata.language_version = ""
results.metadata.language_selected = ""
else:
lang_id = identify_language(sample, static_strings)
lang_id, lang_version = identify_language_and_version(sample, static_strings)

if selected_lang == Language.UNKNOWN:
pass
elif selected_lang != lang_id:
logger.warning(
"the selected language '%s' differs to the automatically identified language '%s (%s)' - extracted "
"strings may be incomplete or inaccurate",
selected_lang.value,
lang_id.value,
lang_version,
)
results.metadata.language_selected = selected_lang.value

# TODO(mr-tz): verify user-selected language makes sense and at least warn user
# include language version in results, if available
# https://github.com/mandiant/flare-floss/issues/900
results.metadata.language = lang_id.value
results.metadata.language_version = lang_version

if lang_id == Language.GO:
if results.metadata.language == Language.GO.value:
if analysis.enable_tight_strings or analysis.enable_stack_strings or analysis.enable_decoded_strings:
logger.warning(
"FLOSS handles Go static strings, but string deobfuscation may be inaccurate and take a long time"
)
results.metadata.language = Language.GO.value

elif lang_id == Language.RUST:
elif results.metadata.language == Language.RUST.value:
if analysis.enable_tight_strings or analysis.enable_stack_strings or analysis.enable_decoded_strings:
logger.warning(
"FLOSS handles Rust static strings, but string deobfuscation may be inaccurate and take a long time"
)
results.metadata.language = Language.RUST.value

elif lang_id == Language.DOTNET:
elif results.metadata.language == Language.DOTNET.value:
logger.warning(".NET language-specific string extraction is not supported yet")
logger.warning("Furthermore, FLOSS does NOT attempt to deobfuscate any strings from .NET binaries")
logger.warning("FLOSS does NOT attempt to deobfuscate any strings from .NET binaries")

# enable .NET strings once we can extract them
# results.metadata.language = Language.DOTNET.value
Expand All @@ -589,7 +596,7 @@ def main(argv=None) -> int:
analysis.enable_tight_strings = False
analysis.enable_decoded_strings = False

if results.metadata.language != "":
if results.metadata.language not in ("", "unknown"):
if args.enabled_types == [] and args.disabled_types == []:
prompt = input("Do you want to enable string deobfuscation? (this could take a long time) [y/N] ")

Expand All @@ -607,47 +614,42 @@ def main(argv=None) -> int:

# in order of expected run time, fast to slow
# 1. static strings (done above)
# a) includes language-specific strings, if applicable
# 2. stack strings
# 3. tight strings
# 4. decoded strings

if results.analysis.enable_static_strings:
logger.info("extracting static strings")
results.strings.static_strings = static_strings
results.metadata.runtime.static_strings = static_runtime

if not lang_id:
logger.info("extracting static strings")
else:
if lang_id == Language.GO:
logger.info("extracting language-specific Go strings")

interim = time()
results.strings.language_strings = floss.language.go.extract.extract_go_strings(sample, args.min_length)
results.metadata.runtime.language_strings = get_runtime_diff(interim)
if results.metadata.language == Language.GO.value:
logger.info("extracting language-specific Go strings")

# missed strings only includes non-identified strings in searched range
# here currently only focus on strings in string blob range
string_blob_strings = floss.language.go.extract.get_static_strings_from_blob_range(
sample, static_strings
)
results.strings.language_strings_missed = floss.language.utils.get_missed_strings(
string_blob_strings, results.strings.language_strings, args.min_length
)
interim = time()
results.strings.language_strings = floss.language.go.extract.extract_go_strings(sample, args.min_length)
results.metadata.runtime.language_strings = get_runtime_diff(interim)

# missed strings only includes non-identified strings in searched range
# here currently only focus on strings in string blob range
string_blob_strings = floss.language.go.extract.get_static_strings_from_blob_range(sample, static_strings)
results.strings.language_strings_missed = floss.language.utils.get_missed_strings(
string_blob_strings, results.strings.language_strings, args.min_length
)

elif lang_id == Language.RUST:
logger.info("extracting language-specific Rust strings")
elif results.metadata.language == Language.RUST.value:
logger.info("extracting language-specific Rust strings")

interim = time()
results.strings.language_strings = floss.language.rust.extract.extract_rust_strings(
sample, args.min_length
)
results.metadata.runtime.language_strings = get_runtime_diff(interim)
interim = time()
results.strings.language_strings = floss.language.rust.extract.extract_rust_strings(sample, args.min_length)
results.metadata.runtime.language_strings = get_runtime_diff(interim)

# currently Rust strings are only extracted from the .rdata section
rdata_strings = floss.language.rust.extract.get_static_strings_from_rdata(sample, static_strings)
results.strings.language_strings_missed = floss.language.utils.get_missed_strings(
rdata_strings, results.strings.language_strings, args.min_length
)
# currently Rust strings are only extracted from the .rdata section
rdata_strings = floss.language.rust.extract.get_static_strings_from_rdata(sample, static_strings)
results.strings.language_strings_missed = floss.language.utils.get_missed_strings(
rdata_strings, results.strings.language_strings, args.min_length
)
if (
results.analysis.enable_decoded_strings
or results.analysis.enable_stack_strings
Expand Down
13 changes: 12 additions & 1 deletion floss/render/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,27 @@ def width(s: str, character_count: int) -> str:

def render_meta(results: ResultDocument, console, verbose):
rows: List[Tuple[str, str]] = list()

lang = f"{results.metadata.language}" if results.metadata.language else ""
lang_v = (
f" ({results.metadata.language_version})"
if results.metadata.language != "unknown" and results.metadata.language_version
else ""
)
lang_s = f" - selected: {results.metadata.language_selected}" if results.metadata.language_selected else ""
language_value = f"{lang}{lang_v}{lang_s}"

if verbose == Verbosity.DEFAULT:
rows.append((width("file path", MIN_WIDTH_LEFT_COL), width(results.metadata.file_path, MIN_WIDTH_RIGHT_COL)))
rows.append(("identified language", language_value))
else:
rows.extend(
[
(width("file path", MIN_WIDTH_LEFT_COL), width(results.metadata.file_path, MIN_WIDTH_RIGHT_COL)),
("start date", results.metadata.runtime.start_date.strftime("%Y-%m-%d %H:%M:%S")),
("runtime", strtime(results.metadata.runtime.total)),
("version", results.metadata.version),
("identified language", results.metadata.language),
("identified language", language_value),
("imagebase", f"0x{results.metadata.imagebase:x}"),
("min string length", f"{results.metadata.min_length}"),
]
Expand Down
2 changes: 2 additions & 0 deletions floss/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ class Metadata:
min_length: int = 0
runtime: Runtime = field(default_factory=Runtime)
language: str = ""
language_version: str = ""
language_selected: str = "" # configured by user


@dataclass
Expand Down
24 changes: 12 additions & 12 deletions tests/test_language_id.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import os
from pathlib import Path

import pytest

from floss.utils import get_static_strings
from floss.language.identify import Language, identify_language
from floss.language.identify import VERSION_UNKNOWN_OR_NA, Language, identify_language_and_version


@pytest.mark.parametrize(
"binary_file, expected_result",
"binary_file, expected_result, expected_version",
[
("data/language/go/go-hello/bin/go-hello.exe", Language.GO),
("data/language/rust/rust-hello/bin/rust-hello.exe", Language.RUST),
("data/test-decode-to-stack.exe", Language.UNKNOWN),
("data/language/dotnet/dotnet-hello/bin/dotnet-hello.exe", Language.DOTNET),
("data/src/shellcode-stackstrings/bin/shellcode-stackstrings.bin", Language.UNKNOWN),
("data/language/go/go-hello/bin/go-hello.exe", Language.GO, "1.20"),
("data/language/rust/rust-hello/bin/rust-hello.exe", Language.RUST, "1.69.0"),
("data/test-decode-to-stack.exe", Language.UNKNOWN, VERSION_UNKNOWN_OR_NA),
("data/language/dotnet/dotnet-hello/bin/dotnet-hello.exe", Language.DOTNET, VERSION_UNKNOWN_OR_NA),
("data/src/shellcode-stackstrings/bin/shellcode-stackstrings.bin", Language.UNKNOWN, VERSION_UNKNOWN_OR_NA),
],
)
def test_language_detection(binary_file, expected_result):
def test_language_detection(binary_file, expected_result, expected_version):
CD = Path(__file__).resolve().parent
abs_path = (CD / binary_file).resolve()
# check if the file exists

assert abs_path.exists(), f"File {binary_file} does not exist"

static_strings = get_static_strings(abs_path, 4)

language = identify_language(abs_path, static_strings)
# Check the expected result
language, version = identify_language_and_version(abs_path, static_strings)

assert language == expected_result, f"Expected: {expected_result.value}, Actual: {language.value}"
assert version == expected_version, f"Expected: {expected_version}, Actual: {version}"

0 comments on commit 801559a

Please sign in to comment.