|
50 | 50 | dist_dir = current_dir + "/dist/"
|
51 | 51 |
|
52 | 52 |
|
| 53 | +def is_safe_archive_path(path): |
| 54 | + # Check for absolute paths (both Unix and Windows style) |
| 55 | + if path.startswith('/') or (len(path) > 1 and path[1] == ':' and path[2] in '\\/'): |
| 56 | + raise ValueError(f"Absolute path not allowed: {path}") |
| 57 | + |
| 58 | + # Normalize the path to handle any path separators |
| 59 | + normalized_path = os.path.normpath(path) |
| 60 | + |
| 61 | + # Check for directory traversal attempts using normalized path |
| 62 | + if ".." in normalized_path.split(os.sep): |
| 63 | + raise ValueError(f"Directory traversal not allowed: {path}") |
| 64 | + |
| 65 | + # Additional check for paths that would escape the target directory |
| 66 | + if normalized_path.startswith(".."): |
| 67 | + raise ValueError(f"Path would escape target directory: {path}") |
| 68 | + |
| 69 | + # Check for any remaining directory traversal patterns in the original path |
| 70 | + # This catches cases that might not be normalized properly |
| 71 | + path_parts = path.replace('\\', '/').split('/') |
| 72 | + if '..' in path_parts: |
| 73 | + raise ValueError(f"Directory traversal not allowed: {path}") |
| 74 | + |
| 75 | + return True |
| 76 | + |
| 77 | + |
| 78 | +def safe_tar_extract(tar_file, destination): |
| 79 | + # Validate all paths before extraction |
| 80 | + for member in tar_file.getmembers(): |
| 81 | + is_safe_archive_path(member.name) |
| 82 | + |
| 83 | + # If all paths are safe, proceed with extraction |
| 84 | + tar_file.extractall(destination, filter="tar") |
| 85 | + |
| 86 | + |
| 87 | +def safe_zip_extract(zip_file, destination): |
| 88 | + # Validate all paths before extraction |
| 89 | + for name in zip_file.namelist(): |
| 90 | + is_safe_archive_path(name) |
| 91 | + |
| 92 | + # If all paths are safe, proceed with extraction |
| 93 | + zip_file.extractall(destination) |
| 94 | + |
| 95 | + |
53 | 96 | def sha256sum(filename, blocksize=65536):
|
54 | 97 | hash = hashlib.sha256()
|
55 | 98 | with open(filename, "rb") as f:
|
@@ -212,6 +255,10 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
|
212 | 255 | print("File corrupted or incomplete!")
|
213 | 256 | cfile = None
|
214 | 257 | file_is_corrupted = True
|
| 258 | + except ValueError as e: |
| 259 | + print(f"Security validation failed: {e}") |
| 260 | + cfile = None |
| 261 | + file_is_corrupted = True |
215 | 262 |
|
216 | 263 | if file_is_corrupted:
|
217 | 264 | corrupted_filename = filename + ".corrupted"
|
@@ -243,15 +290,15 @@ def unpack(filename, destination, force_extract, checksum): # noqa: C901
|
243 | 290 | if filename.endswith("tar.gz"):
|
244 | 291 | if not cfile:
|
245 | 292 | cfile = tarfile.open(filename, "r:gz")
|
246 |
| - cfile.extractall(destination, filter="tar") |
| 293 | + safe_tar_extract(cfile, destination) |
247 | 294 | elif filename.endswith("tar.xz"):
|
248 | 295 | if not cfile:
|
249 | 296 | cfile = tarfile.open(filename, "r:xz")
|
250 |
| - cfile.extractall(destination, filter="tar") |
| 297 | + safe_tar_extract(cfile, destination) |
251 | 298 | elif filename.endswith("zip"):
|
252 | 299 | if not cfile:
|
253 | 300 | cfile = zipfile.ZipFile(filename)
|
254 |
| - cfile.extractall(destination) |
| 301 | + safe_zip_extract(cfile, destination) |
255 | 302 | else:
|
256 | 303 | raise NotImplementedError("Unsupported archive type")
|
257 | 304 |
|
|
0 commit comments