Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix extractions and improve language_strings_missed contents #901

Merged
merged 3 commits into from
Nov 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions floss/language/go/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ def find_stack_strings_with_regex(
if not binary_string:
continue

if binary_string.endswith(b"\x00"):
binary_string = binary_string[:-1]

addr = m.start()
# need to subtract opcode bytes offset
off_regex = len(m.group(0)) - len(binary_string)
Expand Down Expand Up @@ -98,6 +101,9 @@ def find_i386_stackstrings(section_data, offset, min_length):
def get_stackstrings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]:
"""
Find stackstrings in the given PE file.

TODO(mr-tz): algorithms need improvements / rethinking of approach
https://github.com/mandiant/flare-floss/issues/828
"""

for section in pe.sections:
Expand Down Expand Up @@ -269,7 +275,9 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString]
with floss.utils.timing("find struct string candidates"):
struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address))
if not struct_strings:
logger.warning("Failed to find struct string candidates: Is this a Go binary?")
logger.warning(
"Failed to find struct string candidates: Is this a Go binary? If so, the Go version may be unsupported."
)
return

with floss.utils.timing("find string blob"):
Expand Down Expand Up @@ -354,12 +362,14 @@ def get_string_blob_strings(pe: pefile.PE, min_length) -> Iterable[StaticString]
last_buf = string_blob_buf[last_pointer_offset:]
for size in range(len(last_buf), 0, -1):
try:
s = last_buf[:size].decode("utf-8")
_ = last_buf[:size].decode("utf-8")
except UnicodeDecodeError:
continue
else:
try:
string = StaticString.from_utf8(last_buf[:size], last_pointer, min_length)
string = StaticString.from_utf8(
last_buf[:size], pe.get_offset_from_rva(last_pointer - image_base), min_length
)
yield string
except ValueError:
pass
Expand All @@ -382,6 +392,25 @@ def extract_go_strings(sample, min_length) -> List[StaticString]:
return go_strings


def get_static_strings_from_blob_range(sample: pathlib.Path, static_strings: List[StaticString]) -> List[StaticString]:
pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True)

struct_strings = list(sorted(set(get_struct_string_candidates(pe)), key=lambda s: s.address))
if not struct_strings:
return []

try:
string_blob_start, string_blob_end = find_string_blob_range(pe, struct_strings)
except ValueError:
return []

image_base = pe.OPTIONAL_HEADER.ImageBase
string_blob_start = pe.get_offset_from_rva(string_blob_start - image_base)
string_blob_end = pe.get_offset_from_rva(string_blob_end - image_base)

return list(filter(lambda s: string_blob_start <= s.offset < string_blob_end, static_strings))


def main(argv=None):
parser = argparse.ArgumentParser(description="Get Go strings")
parser.add_argument("path", help="file or path to analyze")
Expand Down
59 changes: 29 additions & 30 deletions floss/language/identify.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import re
from enum import Enum
from typing import Iterable
from typing import Tuple, Iterable
from pathlib import Path

import pefile
Expand All @@ -14,38 +14,43 @@
logger = floss.logging_.getLogger(__name__)


VERSION_UNKNOWN_OR_NA = "version unknown"


class Language(Enum):
GO = "go"
RUST = "rust"
DOTNET = "dotnet"
UNKNOWN = "unknown"
DISABLED = "none"


def identify_language(sample: Path, static_strings: Iterable[StaticString]) -> Language:
"""
Identify the language of the binary given
"""
if is_rust_bin(static_strings):
return Language.RUST
def identify_language_and_version(sample: Path, static_strings: Iterable[StaticString]) -> Tuple[Language, str]:
is_rust, version = get_if_rust_and_version(static_strings)
if is_rust:
logger.info("Rust binary found with version: %s", version)
return Language.RUST, version

# Open the file as PE for further checks
try:
pe = pefile.PE(str(sample))
except pefile.PEFormatError as err:
logger.debug(f"NOT a valid PE file: {err}")
return Language.UNKNOWN
return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA

if is_go_bin(pe):
return Language.GO
is_go, version = get_if_go_and_version(pe)
if is_go:
logger.info("Go binary found with version %s", version)
return Language.GO, version
elif is_dotnet_bin(pe):
return Language.DOTNET
return Language.DOTNET, VERSION_UNKNOWN_OR_NA
else:
return Language.UNKNOWN
return Language.UNKNOWN, VERSION_UNKNOWN_OR_NA


def is_rust_bin(static_strings: Iterable[StaticString]) -> bool:
def get_if_rust_and_version(static_strings: Iterable[StaticString]) -> Tuple[bool, str]:
"""
Check if the binary given is compiled with Rust compiler or not
Return if the binary given is compiled with Rust compiler and its version
reference: https://github.com/mandiant/flare-floss/issues/766
"""

Expand All @@ -62,19 +67,17 @@ def is_rust_bin(static_strings: Iterable[StaticString]) -> bool:
matches = regex_hash.search(string)
if matches and matches["hash"] in rust_commit_hash.keys():
version = rust_commit_hash[matches["hash"]]
logger.info("Rust binary found with version: %s", version)
return True
return True, version
if regex_version.search(string):
logger.info("Rust binary found with version: %s", string)
return True
return True, string

return False
return False, VERSION_UNKNOWN_OR_NA


def is_go_bin(pe: pefile.PE) -> bool:
def get_if_go_and_version(pe: pefile.PE) -> Tuple[bool, str]:
"""
Check if the binary given is compiled with Go compiler or not
it checks the magic header of the pclntab structure -pcHeader-
Return if the binary given is compiled with Go compiler and its version
this checks the magic header of the pclntab structure -pcHeader-
the magic values varies through the version
reference:
https://github.com/0xjiayu/go_parser/blob/865359c297257e00165beb1683ef6a679edc2c7f/pclntbl.py#L46
Expand All @@ -101,11 +104,9 @@ def is_go_bin(pe: pefile.PE) -> bool:
if magic in section_data:
pclntab_va = section_data.index(magic) + section_va
if verify_pclntab(section, pclntab_va):
logger.info("Go binary found with version %s", get_go_version(magic))
return True
return True, get_go_version(magic)

# if not found, search in all the available sections

for magic in go_magic:
for section in pe.sections:
section_va = section.VirtualAddress
Expand All @@ -114,10 +115,8 @@ def is_go_bin(pe: pefile.PE) -> bool:
if magic in section_data:
pclntab_va = section_data.index(magic) + section_va
if verify_pclntab(section, pclntab_va):
# just for testing
logger.info("Go binary found with version %s", get_go_version(magic))
return True
return False
return True, get_go_version(magic)
return False, VERSION_UNKNOWN_OR_NA


def get_go_version(magic):
Expand All @@ -137,7 +136,7 @@ def get_go_version(magic):
elif magic == MAGIC_120:
return "1.20"
else:
return "unknown"
return VERSION_UNKNOWN_OR_NA


def verify_pclntab(section, pclntab_va: int) -> bool:
Expand Down
24 changes: 24 additions & 0 deletions floss/language/rust/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,20 @@ def extract_rust_strings(sample: pathlib.Path, min_length: int) -> List[StaticSt
return rust_strings


def get_static_strings_from_rdata(sample, static_strings) -> List[StaticString]:
pe = pefile.PE(data=pathlib.Path(sample).read_bytes(), fast_load=True)

try:
rdata_section = get_rdata_section(pe)
except ValueError:
return []

start_rdata = rdata_section.PointerToRawData
end_rdata = start_rdata + rdata_section.SizeOfRawData

return list(filter(lambda s: start_rdata <= s.offset < end_rdata, static_strings))


def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticString]:
image_base = pe.OPTIONAL_HEADER.ImageBase

Expand All @@ -145,6 +159,11 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt
# select only UTF-8 strings and adjust offset
static_strings = filter_and_transform_utf8_strings(fixed_strings, start_rdata)

# TODO(mr-tz) - handle miss in rust-hello64.exe
# .rdata:00000001400C1270 0A aPanickedAfterP db 0Ah ; DATA XREF: .rdata:00000001400C12B8↓o
# .rdata:00000001400C1271 70 61 6E 69 63 6B 65 64… db 'panicked after panic::always_abort(), aborting.',0Ah,0
# .rdata:00000001400C12A2 00 00 00 00 00 00 align 8

struct_string_addrs = map(lambda c: c.address, get_struct_string_candidates(pe))

if pe.FILE_HEADER.Machine == pefile.MACHINE_TYPE["IMAGE_FILE_MACHINE_I386"]:
Expand All @@ -157,6 +176,11 @@ def get_string_blob_strings(pe: pefile.PE, min_length: int) -> Iterable[StaticSt
xrefs_lea = find_lea_xrefs(pe)
xrefs = itertools.chain(struct_string_addrs, xrefs_lea)

# TODO(mr-tz) - handle movdqa rust-hello64.exe
# .text:0000000140026046 66 0F 6F 05 02 71 09 00 movdqa xmm0, cs:xmmword_1400BD150
# .text:000000014002604E 66 0F 6F 0D 0A 71 09 00 movdqa xmm1, cs:xmmword_1400BD160
# .text:0000000140026056 66 0F 6F 15 12 71 09 00 movdqa xmm2, cs:xmmword_1400BD170

else:
logger.error("unsupported architecture: %s", pe.FILE_HEADER.Machine)
return []
Expand Down
Loading