Skip to content

Commit

Permalink
Merge branch 'lightfuzz' into lightfuzz-deserialize-fp-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
liquidsec authored Jan 23, 2025
2 parents d0cef2c + a250d04 commit c7d44da
Show file tree
Hide file tree
Showing 11 changed files with 143 additions and 48 deletions.
1 change: 1 addition & 0 deletions bbot/core/event/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,6 +1398,7 @@ def _outgoing_dedup_hash(self, event):

def _url(self):
return self.data["url"]


def __str__(self):
max_event_len = 200
Expand Down
7 changes: 5 additions & 2 deletions bbot/core/helpers/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ def extract_params_xml(xml_data, compare_mode="getparam"):
xml_data (str): XML-formatted string containing elements.
Returns:
set: A set of tuples containing the tags and their corresponding text values present in the XML object.
set: A set of tuples containing the tags and their corresponding sanitized text values present in the XML object.
Raises:
Returns an empty set if ParseError occurs.
Expand All @@ -913,7 +913,10 @@ def extract_params_xml(xml_data, compare_mode="getparam"):
while stack:
current_element = stack.pop()
if validate_parameter(current_element.tag, compare_mode):
tag_value_pairs.add((current_element.tag, current_element.text))
# Sanitize the text value
text_value = current_element.text.strip() if current_element.text else None
sanitized_value = quote(text_value, safe='') if text_value else None
tag_value_pairs.add((current_element.tag, sanitized_value))
for child in current_element:
stack.append(child)
return tag_value_pairs
Expand Down
6 changes: 3 additions & 3 deletions bbot/core/helpers/regexes.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@

# For use with excavate parameters extractor
input_tag_regex = re.compile(
r"<input[^>]*?\sname=[\"\']?([\-\._=+\/\w]+)[\"\']?[^>]*?\svalue=[\"\']?([:%\-\._=+\/\w]*)[\"\']?[^>]*?>"
r"<input[^>]*?\sname=[\"\']?([\-\._=+\/\w]+)[\"\']?[^>]*?\svalue=[\"\']?([:%\-\._=+\/\w\s]*)[\"\']?[^>]*?>"
)
input_tag_regex2 = re.compile(
r"<input[^>]*?\svalue=[\"\']?([:\-%\._=+\/\w]*)[\"\']?[^>]*?\sname=[\"\']?([\-\._=+\/\w]+)[\"\']?[^>]*?>"
r"<input[^>]*?\svalue=[\"\']?([:\-%\._=+\/\w\s]*)[\"\']?[^>]*?\sname=[\"\']?([\-\._=+\/\w]+)[\"\']?[^>]*?>"
)
input_tag_novalue_regex = re.compile(r"<input(?![^>]*\b\svalue=)[^>]*?\sname=[\"\']?([\-\._=+\/\w]*)[\"\']?[^>]*?>")
# jquery_get_regex = re.compile(r"url:\s?[\"\'].+?\?(\w+)=")
Expand Down Expand Up @@ -169,7 +169,7 @@
button_tag_regex2 = re.compile(
r"<button[^>]*?value=[\"\']?([\-%\._=+\/\w]*)[\"\']?[^>]*?name=[\"\']?([\-\._=+\/\w]+)[\"\']?[^>]*?>"
)
tag_attribute_regex = re.compile(r"<[^>]*(?:href|action|src)\s*=\s*[\"\']?(?!mailto:)([^\s\'\"\>]+)[\"\']?[^>]*>")
tag_attribute_regex = re.compile(r"<[^>]*(?:href|action|src)\s*=\s*[\"\']?(?!mailto:)([^\'\"\>]+)[\"\']?[^>]*>")

valid_netloc = r"[^\s!@#$%^&()=/?\\'\";~`<>]+"

Expand Down
4 changes: 1 addition & 3 deletions bbot/modules/internal/excavate.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,8 +1053,6 @@ async def setup(self):
return True

async def search(self, data, event, content_type, discovery_context="HTTP response"):
# TODO: replace this JSON/XML extraction with our lightfuzz envelope stuff

if not data:
return None
decoded_data = await self.helpers.re.recursive_decode(data)
Expand All @@ -1065,7 +1063,7 @@ async def search(self, data, event, content_type, discovery_context="HTTP respon
"json": self.helpers.extract_params_json,
"xml": self.helpers.extract_params_xml,
}

for source_type, extract_func in extraction_map.items():
if source_type in content_type_lower:
results = extract_func(data)
Expand Down
46 changes: 33 additions & 13 deletions bbot/modules/lightfuzz_submodules/base.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import copy
import base64
import binascii
from urllib.parse import quote

class BaseLightfuzz:
def __init__(self, lightfuzz, event):
Expand Down Expand Up @@ -50,22 +51,32 @@ def additional_params_process(self, additional_params, additional_params_populat
new_additional_params[k] = v
return new_additional_params

def compare_baseline(
self, event_type, probe, cookies, additional_params_populate_empty=False, speculative_mode="GETPARAM"
):
def conditional_urlencode(self, probe, event_type, skip_urlencoding=False):
"""Conditionally url-encodes the probe if the event type requires it and encoding is not skipped by the submodule.
We also don't encode if any envelopes are present.
"""
Initializes the http_compare object and executes a probe to establish a baseline for comparison.
if event_type in ["GETPARAM", "COOKIE"] and not skip_urlencoding and getattr(self.event, "envelopes", None):
# Exclude '&' from being encoded since we are operating on full query strings
return quote(probe, safe='&')
return probe

Handles each of the types of WEB_PARAMETERS (GETPARAM, COOKIE, HEADER, POSTPARAM, BODYJSON)
"""
def compare_baseline(
self, event_type, probe, cookies, additional_params_populate_empty=False, speculative_mode="GETPARAM", skip_urlencoding=False
):

# Transparently pack the probe value into the envelopes, if present
probe = self.outgoing_probe_value(probe)

# URL Encode the probe if the event type is GETPARAM or COOKIE, if there are no envelopes, and the submodule did not opt-out with skip_urlencoding
probe = self.conditional_urlencode(probe, event_type, skip_urlencoding)
http_compare = None

if event_type == "SPECULATIVE":
event_type = speculative_mode

if event_type == "GETPARAM":
baseline_url = f"{self.event.data['url']}?{self.event.data['name']}={probe}"

if "additional_params" in self.event.data.keys() and self.event.data["additional_params"] is not None:
baseline_url = self.lightfuzz.helpers.add_get_params(
baseline_url, self.event.data["additional_params"], encode=False
Expand Down Expand Up @@ -134,13 +145,18 @@ async def compare_probe(
additional_params_populate_empty=False,
additional_params_override={},
speculative_mode="GETPARAM",
skip_urlencoding=False,
):
"""
Executes a probe to compare against a baseline.
"""

# Transparently pack the probe value into the envelopes, if present
probe = self.outgoing_probe_value(probe)
additional_params = copy.deepcopy(self.event.data.get("additional_params", {}))

# URL Encode the probe if the event type is GETPARAM or COOKIE, if there are no envelopes, and the submodule did not opt-out with skip_urlencoding
probe = self.conditional_urlencode(probe, event_type, skip_urlencoding)

# Create a complete copy to avoid modifying the original additional_params
additional_params = copy.deepcopy(self.event.data.get("additional_params", {}))

if additional_params_override:
for k, v in additional_params_override.items():
additional_params[k] = v
Expand Down Expand Up @@ -185,19 +201,26 @@ async def standard_probe(
additional_params_populate_empty=False,
speculative_mode="GETPARAM",
allow_redirects=False,
skip_urlencoding=False,
):
"""
Send a probe to the target URL, abstracting away the details associated with each WEB_PARAMETER type.
"""

# Transparently pack the probe value into the envelopes, if present
probe = self.outgoing_probe_value(probe)

# URL Encode the probe if the event type is GETPARAM or COOKIE, if there are no envelopes, and the submodule did not opt-out with skip_urlencoding
probe = self.conditional_urlencode(probe, event_type, skip_urlencoding)

if event_type == "SPECULATIVE":
event_type = speculative_mode

method = "GET"

if event_type == "GETPARAM":
url = f"{self.event.data['url']}?{self.event.data['name']}={probe}"

if "additional_params" in self.event.data.keys() and self.event.data["additional_params"] is not None:
url = self.lightfuzz.helpers.add_get_params(
url, self.event.data["additional_params"], encode=False
Expand All @@ -216,9 +239,6 @@ async def standard_probe(
json_data = None

if event_type == "POSTPARAM":



method = "POST"
data = {self.event.data["name"]: probe}
if self.event.data["additional_params"] is not None:
Expand Down
9 changes: 6 additions & 3 deletions bbot/modules/lightfuzz_submodules/cmdi.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ async def fuzz(self):
try:
# add "echo" to the cmdi probe value to construct the command to be executed
echo_probe = f"{probe_value}{p} echo {canary} {p}"
# we have to handle our own URL-encoding here, because our payloads include the & character
if self.event.data["type"] == "GETPARAM":
echo_probe = urllib.parse.quote(echo_probe.encode(), safe="")

# send cmdi probe and compare with baseline response
cmdi_probe = await self.compare_probe(http_compare, self.event.data["type"], echo_probe, cookies)
cmdi_probe = await self.compare_probe(http_compare, self.event.data["type"], echo_probe, cookies, skip_urlencoding=True)

# ensure we received an HTTP response
if cmdi_probe[3]:
# check if the canary is in the response and the word "echo" is NOT in the response text, ruling out mere reflection of the entire probe value without execution
Expand Down Expand Up @@ -66,10 +69,10 @@ async def fuzz(self):
}
# payload is an nslookup command that includes the interactsh domain prepended the previously generated subdomain tag
interactsh_probe = f"{p} nslookup {subdomain_tag}.{self.lightfuzz.interactsh_domain} {p}"

# we have to handle our own URL-encoding here, because our payloads include the & character
if self.event.data["type"] == "GETPARAM":
interactsh_probe = urllib.parse.quote(interactsh_probe.encode(), safe="")
# we send the probe here, and any positive detections are processed in the interactsh_callback defined in lightfuzz.py
await self.standard_probe(
self.event.data["type"], cookies, f"{probe_value}{interactsh_probe}", timeout=15
self.event.data["type"], cookies, f"{probe_value}{interactsh_probe}", timeout=15, skip_urlencoding=True
)
8 changes: 4 additions & 4 deletions bbot/modules/lightfuzz_submodules/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ async def fuzz(self):
confirmations = 0
while iterations > 0:
try:
http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies)
http_compare = self.compare_baseline(self.event.data["type"], probe_value, cookies, skip_urlencoding=True)
singledot_probe = await self.compare_probe(
http_compare, self.event.data["type"], payloads["singledot_payload"], cookies
http_compare, self.event.data["type"], payloads["singledot_payload"], cookies, skip_urlencoding=True
)
doubledot_probe = await self.compare_probe(
http_compare, self.event.data["type"], payloads["doubledot_payload"], cookies
http_compare, self.event.data["type"], payloads["doubledot_payload"], cookies, skip_urlencoding=True
)
# if singledot_probe[0] is true, the response is the same as the baseline. This indicates adding a single dot did not break the functionality
# next, if doubledot_probe[0] is false, the response is different from the baseline. This further indicates that a real path is being manipulated
Expand Down Expand Up @@ -116,7 +116,7 @@ async def fuzz(self):
}

for path, trigger in absolute_paths.items():
r = await self.standard_probe(self.event.data["type"], cookies, path)
r = await self.standard_probe(self.event.data["type"], cookies, path, skip_urlencoding=True)
if r and trigger in r.text:
self.results.append(
{
Expand Down
3 changes: 2 additions & 1 deletion bbot/modules/lightfuzz_submodules/ssti.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ async def fuzz(self):
# These are common SSTI payloads, each attempting to trigger an integer multiplication which would produce an expected value
ssti_probes = ["<%25%3d%201337*1337%20%25>","<%= 1337*1337 %>", "${1337*1337}", "%24%7b1337*1337%7d", "1,787{{z}},569"]
for probe_value in ssti_probes:
r = await self.standard_probe(self.event.data["type"], cookies, probe_value, allow_redirects=True)
r = await self.standard_probe(self.event.data["type"], cookies, probe_value, allow_redirects=True, skip_urlencoding=True)

# look for the expected value in the response
if r and ("1787569" in r.text or "1,787,569" in r.text):
self.results.append(
Expand Down
1 change: 0 additions & 1 deletion bbot/test/test_step_1/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,6 @@ def echo_cookies_handler(request):
bbot_httpserver.expect_request(uri=endpoint).respond_with_handler(echo_cookies_handler)
scan1 = bbot_scanner("127.0.0.1", config={"web": {"debug": True}})
r1 = await scan1.helpers.request(url, cookies={"foo": "bar"})
print(r1.text)

assert r1 is not None, "Request to self-signed SSL server went through even with ssl_verify=True"
assert "bar" in r1.text
Expand Down
37 changes: 36 additions & 1 deletion bbot/test/test_step_2/module_tests/test_module_excavate.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ class TestExcavateParameterExtraction(TestExcavate):
<form action="/search" method="get">
<label for="searchQuery">Search Query:</label>
<input type="text" id="searchQuery" name="q1" value="flowers"><br><br>
<input type="text" id="searchQueryspaces" name="q4" value="trees and forests"><br><br>
<input type="submit" value="Search">
</form>
<h1>Simple POST Form</h1>
Expand Down Expand Up @@ -502,8 +503,10 @@ def check(self, module_test, events):
found_htmltags_a = False
found_htmltags_img = False
found_select_noquotes = False

avoid_truncated_values = True
found_form_input_with_spaces = False
for e in events:

if e.type == "WEB_PARAMETER":
if e.data["description"] == "HTTP Extracted Parameter [jqueryget] (GET jquery Submodule)":
found_jquery_get = True
Expand Down Expand Up @@ -548,11 +551,19 @@ def check(self, module_test, events):
if "csrf" in e.data["additional_params"].keys():
found_select_noquotes = True

if e.data["description"] == "HTTP Extracted Parameter [q4] (GET Form Submodule)":
if e.data["original_value"] == "trees and forests":
found_form_input_with_spaces = True
if e.data["original_value"] == "trees":
avoid_truncated_values = False

assert found_jquery_get, "Did not extract Jquery GET parameters"
assert found_jquery_post, "Did not extract Jquery POST parameters"
assert found_form_get, "Did not extract Form GET parameters"
assert found_form_post, "Did not extract Form POST parameters"
assert found_form_generic, "Did not extract Form (Generic) parameters"
assert found_form_input_with_spaces, "Did not extract Form input with spaces"
assert avoid_truncated_values, "Emitted a parameter with spaces without the entire value"
assert found_jquery_get_original_value, "Did not extract Jquery GET parameter original_value"
assert found_jquery_post_original_value, "Did not extract Jquery POST parameter original_value"
assert found_form_get_original_value, "Did not extract Form GET parameter original_value"
Expand Down Expand Up @@ -779,6 +790,30 @@ def check(self, module_test, events):
assert excavate_xml_extraction, "Excavate failed to extract xml parameter"


class TestExcavateParameterExtraction_xml_invalid(TestExcavateParameterExtraction_xml):
getparam_extract_xml = """
<data>
<obscureParameter>1</obscureParameter>
<newlines>invalid\nwith\nnewlines</newlines>
</data>
"""

async def setup_after_prep(self, module_test):
respond_args = {"response_data": self.getparam_extract_xml, "headers": {"Content-Type": "application/xml"}}
module_test.set_expect_requests(respond_args=respond_args)

def check(self, module_test, events):
excavate_xml_extraction = False
for e in events:
if e.type == "WEB_PARAMETER":
if (
"HTTP Extracted Parameter (speculative from xml content) [newlines]"
in e.data["description"]
and "\n" not in e.data["original_value"]
):
excavate_xml_extraction = True
assert excavate_xml_extraction, "Excavate failed to extract xml parameter"

class TestExcavateParameterExtraction_inputtagnovalue(ModuleTestBase):
targets = ["http://127.0.0.1:8888/"]

Expand Down
Loading

0 comments on commit c7d44da

Please sign in to comment.