Skip to content

fix(python): Fixes for Python code scanning alerts #11668

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion .github/scripts/merge_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@


def load_package(filename):
pkg = json.load(open(filename))["packages"][0]
with open(filename) as f:
pkg = json.load(f)["packages"][0]
print("Loaded package {0} from {1}".format(pkg["name"], filename), file=sys.stderr)
print("{0} platform(s), {1} tools".format(len(pkg["platforms"]), len(pkg["tools"])), file=sys.stderr)
return pkg
Expand Down
Binary file modified tools/espota.exe
Binary file not shown.
8 changes: 6 additions & 2 deletions tools/espota.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ def serve(remote_addr, local_addr, remote_port, local_port, password, filename,
return 1

content_size = os.path.getsize(filename)
file_md5 = hashlib.md5(open(filename, "rb").read()).hexdigest()
with open(filename, "rb") as f:
file_md5 = hashlib.md5(f.read()).hexdigest()
logging.info("Upload size: %d", content_size)
message = "%d %d %d %s\n" % (command, local_port, content_size, file_md5)

Expand Down Expand Up @@ -163,6 +164,7 @@ def serve(remote_addr, local_addr, remote_port, local_port, password, filename,
sock2.close()

logging.info("Waiting for device...")

try:
sock.settimeout(10)
connection, client_address = sock.accept()
Expand All @@ -172,6 +174,7 @@ def serve(remote_addr, local_addr, remote_port, local_port, password, filename,
logging.error("No response from device")
sock.close()
return 1

try:
with open(filename, "rb") as f:
if PROGRESS:
Expand Down Expand Up @@ -225,7 +228,8 @@ def serve(remote_addr, local_addr, remote_port, local_port, password, filename,
logging.error("Error response from device")
connection.close()
return 1

except Exception as e: # noqa: E722
logging.error("Error: %s", str(e))
finally:
connection.close()

Expand Down
Binary file modified tools/get.exe
Binary file not shown.
61 changes: 54 additions & 7 deletions tools/get.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,49 @@
dist_dir = current_dir + "/dist/"


def is_safe_archive_path(path):
# Check for absolute paths (both Unix and Windows style)
if path.startswith('/') or (len(path) > 1 and path[1] == ':' and path[2] in '\\/'):
raise ValueError(f"Absolute path not allowed: {path}")

# Normalize the path to handle any path separators
normalized_path = os.path.normpath(path)

# Check for directory traversal attempts using normalized path
if ".." in normalized_path.split(os.sep):
raise ValueError(f"Directory traversal not allowed: {path}")

# Additional check for paths that would escape the target directory
if normalized_path.startswith(".."):
raise ValueError(f"Path would escape target directory: {path}")

# Check for any remaining directory traversal patterns in the original path
# This catches cases that might not be normalized properly
path_parts = path.replace('\\', '/').split('/')
if '..' in path_parts:
raise ValueError(f"Directory traversal not allowed: {path}")

return True


def safe_tar_extract(tar_file, destination):
# Validate all paths before extraction
for member in tar_file.getmembers():
is_safe_archive_path(member.name)

# If all paths are safe, proceed with extraction
tar_file.extractall(destination, filter="tar")


def safe_zip_extract(zip_file, destination):
# Validate all paths before extraction
for name in zip_file.namelist():
is_safe_archive_path(name)

# If all paths are safe, proceed with extraction
zip_file.extractall(destination)


def sha256sum(filename, blocksize=65536):
hash = hashlib.sha256()
with open(filename, "rb") as f:
Expand Down Expand Up @@ -212,6 +255,10 @@
print("File corrupted or incomplete!")
cfile = None
file_is_corrupted = True
except ValueError as e:
print(f"Security validation failed: {e}")
cfile = None
file_is_corrupted = True

if file_is_corrupted:
corrupted_filename = filename + ".corrupted"
Expand Down Expand Up @@ -243,15 +290,15 @@
if filename.endswith("tar.gz"):
if not cfile:
cfile = tarfile.open(filename, "r:gz")
cfile.extractall(destination, filter="tar")
safe_tar_extract(cfile, destination)
elif filename.endswith("tar.xz"):
if not cfile:
cfile = tarfile.open(filename, "r:xz")
cfile.extractall(destination, filter="tar")
safe_tar_extract(cfile, destination)
elif filename.endswith("zip"):
if not cfile:
cfile = zipfile.ZipFile(filename)
cfile.extractall(destination)
safe_zip_extract(cfile, destination)
else:
raise NotImplementedError("Unsupported archive type")

Expand Down Expand Up @@ -348,9 +395,8 @@
urlretrieve(url, local_path, report_progress, context=ctx)
elif "Windows" in sys_name:
r = requests.get(url)
f = open(local_path, "wb")
f.write(r.content)
f.close()
with open(local_path, "wb") as f:
f.write(r.content)
else:
is_ci = os.environ.get("GITHUB_WORKSPACE")
if is_ci:
Expand All @@ -374,7 +420,8 @@


def load_tools_list(filename, platform):
tools_info = json.load(open(filename))["packages"][0]["tools"]
with open(filename, "r") as f:
tools_info = json.load(f)["packages"][0]["tools"]
tools_to_download = []
for t in tools_info:
if platform == "x86_64-mingw32":
Expand Down
Loading