1
1
"""File extraction related functions."""
2
+ # ruff: noqa
3
+
2
4
import errno
3
5
import os
4
6
from pathlib import Path
15
17
FILE_PERMISSION_MASK = 0o644
16
18
DIR_PERMISSION_MASK = 0o775
17
19
20
+ # re-exported, or unused symbols
21
+ __all__ = [
22
+ "is_safe_path" ,
23
+ "MaliciousSymlinkRemoved" ,
24
+ "fix_symlink" ,
25
+ "is_recursive_link" ,
26
+ ]
27
+
18
28
19
29
def carve_chunk_to_file (carve_path : Path , file : File , chunk : Chunk ):
20
30
"""Extract valid chunk to a file, which we then pass to another tool to extract it."""
@@ -57,8 +67,11 @@ def sanitize_symlink_target(base_dir, current_dir, target):
57
67
# Normalize all paths to their absolute forms
58
68
base_dir_abs = os .path .abspath (base_dir )
59
69
current_dir_abs = os .path .abspath (current_dir )
60
- target_abs = os .path .abspath (os .path .join (current_dir , target )) \
61
- if not os .path .isabs (target ) else os .path .abspath (target )
70
+ target_abs = (
71
+ os .path .abspath (os .path .join (current_dir , target ))
72
+ if not os .path .isabs (target )
73
+ else os .path .abspath (target )
74
+ )
62
75
63
76
# Check if the target is absolute and within the base_dir
64
77
if os .path .isabs (target ):
@@ -67,7 +80,13 @@ def sanitize_symlink_target(base_dir, current_dir, target):
67
80
else :
68
81
# Target is absolute but outside base_dir - we'll pretend base_dir is our root
69
82
# and adjust the target to be within base_dir
70
- abs = base_dir + "/" + os .path .relpath (target_abs , os .path .commonpath ([target_abs , base_dir_abs ]))
83
+ abs = (
84
+ base_dir
85
+ + "/"
86
+ + os .path .relpath (
87
+ target_abs , os .path .commonpath ([target_abs , base_dir_abs ])
88
+ )
89
+ )
71
90
# We want to return the relative path from current_dir to the adjusted target
72
91
return os .path .relpath (abs , current_dir_abs )
73
92
else :
@@ -82,18 +101,21 @@ def sanitize_symlink_target(base_dir, current_dir, target):
82
101
# relative path from /host/test_archive/foo to /host/test_archive/etc/passwd
83
102
# without escaping /host/test_archive
84
103
85
- for drop_count in range (0 , len (target .split ('/' ))):
104
+ for drop_count in range (len (target .split ("/" ))):
86
105
# We drop '..'s from the target by prepending placeholder directories until we get something valid
87
106
abs = current_dir + "/" + "/" .join (["foo" ] * drop_count ) + target
88
107
resolved = os .path .abspath (abs )
89
108
if resolved .startswith (base_dir_abs ):
90
109
break
91
110
else :
92
- raise ValueError (f"Could not resolve symlink target { target } within base_dir { base_dir } " )
111
+ raise ValueError (
112
+ f"Could not resolve symlink target { target } within base_dir { base_dir } "
113
+ )
93
114
94
115
# We need to add the /placeholder to the relative path because we need
95
116
# to act like a file within base_dir is our root (as opposed to base_dir itself)
96
- return os .path .relpath (resolved , base_dir_abs + '/placeholder' )
117
+ return os .path .relpath (resolved , base_dir_abs + "/placeholder" )
118
+
97
119
98
120
def fix_extracted_directory (outdir : Path , task_result : TaskResult ):
99
121
def _fix_extracted_directory (directory : Path ):
@@ -103,7 +125,7 @@ def _fix_extracted_directory(directory: Path):
103
125
base_dir = os .path .abspath (outdir )
104
126
for root , dirs , files in os .walk (base_dir , topdown = True ):
105
127
fix_permission (Path (root ))
106
- for name in dirs + files :
128
+ for name in dirs + files :
107
129
try :
108
130
full_path = os .path .join (root , name )
109
131
if os .path .islink (full_path ):
@@ -113,9 +135,15 @@ def _fix_extracted_directory(directory: Path):
113
135
if new_target != target :
114
136
os .remove (full_path )
115
137
os .symlink (new_target , full_path )
116
- logger .info ("Updated symlink" , path = full_path , target = new_target )
138
+ logger .info (
139
+ "Updated symlink" , path = full_path , target = new_target
140
+ )
117
141
else :
118
- logger .debug ("Symlink is already sanitized" , path = full_path , target = new_target )
142
+ logger .debug (
143
+ "Symlink is already sanitized" ,
144
+ path = full_path ,
145
+ target = new_target ,
146
+ )
119
147
except OSError as e :
120
148
if e .errno == errno .ENAMETOOLONG :
121
149
continue
0 commit comments