-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlog4shell_validator.py
118 lines (99 loc) · 5.39 KB
/
log4shell_validator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import argparse
import logging
import re
import os
import sys
from subprocess import Popen, PIPE
from colored_formatted import ColoredFormatter
from constants import LOG_PATTERNS, DEFAULT_PAYLOAD
logging.root.setLevel(logging.NOTSET)
logger = logging.getLogger(__file__)
color_handler = logging.StreamHandler()
color_handler.setLevel(logging.DEBUG)
color_handler.setFormatter(ColoredFormatter())
logger.addHandler(color_handler)
def delete_containers():
logger.info("Shutting down the docker containers")
docker_compose_down_process = Popen(["docker-compose", "down"], stdout=PIPE, stderr=PIPE)
docker_compose_down_process.communicate()
logger.info("Containers shut down successfully")
def analyze_log_line(log_line):
for logpattern in LOG_PATTERNS:
pattern_regex, pattern_message, log_level = logpattern
if re.match(pattern_regex, log_line):
logger.log(log_level, pattern_message)
continue
def create_args_parser():
parser = argparse.ArgumentParser(description="log4j-vulnerability-tester")
parser.add_argument("--java-version", type=int, dest="java_version", default=8,
help="version of java to be used")
parser.add_argument("--log4j-version", type=str, dest="log4j_version", required=True,
help="version of log4j to be used")
parser.add_argument("--no-cleanup", action='store_true', default=False, help="Don't delete used containers.")
parser.add_argument("--disable-trust-url", action='store_true', default=False, help="Disable the LDAP trustURLCodebase setting on the victim app")
parser.add_argument("--remove-jndi-lookup-class", action='store_true', default=False, help="Remove the JNDI Lookup class from the log4j jar file")
parser.add_argument("--disable-message-lookup", action='store_true', default=False, help="Disable the JNDI message lookup in the victim app's JVM")
parser.add_argument("--debug", action='store_true', default=False, help="Display all container's logs")
parser.add_argument("--exploit-via-thread-context",dest="exploit_via_ctx", action='store_true', default=False, help="By sending the payload to an HTTP handler "
"which makes use of Custom logging with ThreadContext, attempt to bypass the no-message-lookup mitigation")
return parser
def run_log4shell_attack_simulation(debug, no_cleanup):
logger.info("Starting up the docker environment.")
docker_compose_up_process = Popen(["docker-compose", "up"], stdout=PIPE, stderr=PIPE, env=os.environ)
try:
while True:
for line in iter(docker_compose_up_process.stdout.readline,''):
decoded_line = line.rstrip().decode('utf-8')
if debug:
logger.debug(decoded_line)
analyze_log_line(decoded_line)
if "Could not find artifact org.apache.logging.log4j" in decoded_line:
logger.critical("The version of log4j you specified does not seem to exist.")
if not no_cleanup:
delete_containers()
sys.exit(2)
if "Max attempts reached" in decoded_line:
logger.critical("The attacker failed to send the payload to the vulnearble app.")
if not no_cleanup:
delete_containers()
sys.exit(1)
if "Remote Code Execution gained via CVE-2021-44228" in decoded_line:
logger.error("[[[ This version of the application is vulnerable to CVE-2021-44228 ]]]")
if not no_cleanup:
delete_containers()
sys.exit(0)
if "The payload should have been triggered by now" in decoded_line:
logger.info("[[[ This version of the application is not vulnerable to CVE-2021-44228 ]]]")
if not no_cleanup:
delete_containers()
sys.exit(0)
except KeyboardInterrupt:
answer_in_range = False
while not answer_in_range:
answer = input("Would you like to shutdown and delete the containers before sys.exiting? [n/y]")
if answer == "y":
delete_containers()
sys.exit(0)
elif answer == "n":
sys.exit(0)
def main():
parser = create_args_parser()
args = parser.parse_args()
java_version = args.java_version
if java_version not in [7, 8]:
raise NotImplementedError("Only 8 or 7 are valid values for java versions")
log4j_version = args.log4j_version
os.environ["JAVA_VERSION"] = f"java{java_version}"
os.environ["LOG4J_VERSION"] = log4j_version
os.environ["PAYLOAD_44228"] = DEFAULT_PAYLOAD
if args.exploit_via_ctx:
os.environ["TARGET_URI"] = "2021-44228-via-context"
else:
os.environ["TARGET_URI"] = "2021-44228-via-message"
os.environ["DISABLE_TRUST_URL"] = str(args.disable_trust_url)
os.environ["REMOVE_JNDI_LOOKUP_CLASS"] = str(args.remove_jndi_lookup_class)
os.environ["DISABLE_MESSAGE_LOOKUP"] = str(args.disable_message_lookup)
logger.info(f"Populating the docker-compose.yaml file for Java version: '{java_version}' and Log4J version: '{log4j_version}' with the payload '{DEFAULT_PAYLOAD}'")
run_log4shell_attack_simulation(debug=args.debug, no_cleanup=args.no_cleanup)
if __name__ == "__main__":
main()