Skip to content

Commit 954c1cd

Browse files
committed
Rewrite symlink sanitization logic
We can rewrite symlinks to ensure they are always relative and remain within the extraction directory.
1 parent 49de1c6 commit 954c1cd

File tree

1 file changed

+61
-56
lines changed

1 file changed

+61
-56
lines changed

unblob/extractor.py

Lines changed: 61 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -47,68 +47,73 @@ def is_recursive_link(path: Path) -> bool:
4747
return False
4848

4949

50-
def fix_symlink(path: Path, outdir: Path, task_result: TaskResult) -> Path:
51-
"""Rewrites absolute symlinks to point within the extraction directory (outdir).
52-
53-
If it's not a relative symlink it is either removed it it attempts
54-
to traverse outside of the extraction directory or rewritten to be
55-
fully portable (no mention of the extraction directory in the link
56-
value).
57-
"""
58-
if is_recursive_link(path):
59-
logger.error("Symlink loop identified, removing", path=path)
60-
error_report = MaliciousSymlinkRemoved(
61-
link=path.as_posix(), target=os.readlink(path)
62-
)
63-
task_result.add_report(error_report)
64-
path.unlink()
65-
return path
66-
67-
raw_target = os.readlink(path)
68-
if not raw_target:
69-
logger.error("Symlink with empty target, removing.")
70-
path.unlink()
71-
return path
72-
73-
target = Path(raw_target)
74-
75-
if target.is_absolute():
76-
target = Path(target.as_posix().lstrip("/"))
50+
def sanitize_symlink_target(base_dir, current_dir, target):
51+
# Normalize all paths to their absolute forms
52+
base_dir_abs = os.path.abspath(base_dir)
53+
current_dir_abs = os.path.abspath(current_dir)
54+
target_abs = os.path.abspath(os.path.join(current_dir, target)) \
55+
if not os.path.isabs(target) else os.path.abspath(target)
56+
57+
# Check if the target is absolute and within the base_dir
58+
if os.path.isabs(target):
59+
if target_abs.startswith(base_dir_abs):
60+
return os.path.relpath(target_abs, current_dir_abs)
61+
else:
62+
# Target is absolute but outside base_dir - we'll pretend base_dir is our root
63+
# and adjust the target to be within base_dir
64+
abs = base_dir + "/" + os.path.relpath(target_abs, os.path.commonpath([target_abs, base_dir_abs]))
65+
# We want to return the relative path from current_dir to the adjusted target
66+
return os.path.relpath(abs, current_dir_abs)
7767
else:
78-
target = path.resolve()
79-
80-
safe = is_safe_path(outdir, target)
81-
82-
if not safe:
83-
logger.error("Path traversal attempt through symlink, removing", target=target)
84-
error_report = MaliciousSymlinkRemoved(
85-
link=path.as_posix(), target=target.as_posix()
86-
)
87-
task_result.add_report(error_report)
88-
path.unlink()
89-
else:
90-
relative_target = os.path.relpath(outdir.joinpath(target), start=path.parent)
91-
path.unlink()
92-
path.symlink_to(relative_target)
93-
return path
94-
68+
# Target is relative
69+
if target_abs.startswith(base_dir_abs):
70+
# Target is relative and does not escape base_dir
71+
return os.path.relpath(target_abs, current_dir_abs)
72+
else:
73+
# Target is relative and escapes base_dir
74+
# Say we have foo/passwd -> ../../../etc/passwd with root at /host/test_archive
75+
# from /host/test_archive/foo/passwd, we want to return ../etc/passwd which is the
76+
# relative path from /host/test_archive/foo to /host/test_archive/etc/passwd
77+
# without escaping /host/test_archive
78+
79+
for drop_count in range(0, len(target.split('/'))):
80+
# We drop '..'s from the target by prepending placeholder directories until we get something valid
81+
abs = current_dir + "/" + "/".join(["foo"] * drop_count) + target
82+
resolved = os.path.abspath(abs)
83+
if resolved.startswith(base_dir_abs):
84+
break
85+
else:
86+
raise ValueError(f"Could not resolve symlink target {target} within base_dir {base_dir}")
87+
88+
# We need to add the /placeholder to the relative path because we need
89+
# to act like a file within base_dir is our root (as opposed to base_dir itself)
90+
return os.path.relpath(resolved, base_dir_abs + '/placeholder')
9591

9692
def fix_extracted_directory(outdir: Path, task_result: TaskResult):
9793
def _fix_extracted_directory(directory: Path):
9894
if not directory.exists():
9995
return
100-
for path in (directory / p for p in os.listdir(directory)):
101-
try:
102-
fix_permission(path)
103-
if path.is_symlink():
104-
fix_symlink(path, outdir, task_result)
105-
continue
106-
if path.is_dir():
107-
_fix_extracted_directory(path)
108-
except OSError as e:
109-
if e.errno == errno.ENAMETOOLONG:
110-
continue
111-
raise e from None
96+
97+
base_dir = os.path.abspath(outdir)
98+
for root, dirs, files in os.walk(base_dir, topdown=True):
99+
fix_permission(Path(root))
100+
for name in dirs+files:
101+
try:
102+
full_path = os.path.join(root, name)
103+
if os.path.islink(full_path):
104+
# Make symlinks relative and constrain them to the base_dir
105+
target = os.readlink(full_path)
106+
new_target = sanitize_symlink_target(base_dir, root, target)
107+
if new_target != target:
108+
os.remove(full_path)
109+
os.symlink(new_target, full_path)
110+
logger.info("Updated symlink", path=full_path, target=new_target)
111+
else:
112+
logger.debug("Symlink is already sanitized", path=full_path, target=new_target)
113+
except OSError as e:
114+
if e.errno == errno.ENAMETOOLONG:
115+
continue
116+
raise e from None
112117

113118
fix_permission(outdir)
114119
_fix_extracted_directory(outdir)

0 commit comments

Comments
 (0)