Skip to content

Commit 75e60f2

Browse files
Degrade gracefully when scanning a password-protected zip files (#39)
Co-authored-by: Matthieu Maitre <mmaitre@microsoft.com>
1 parent 4098df1 commit 75e60f2

File tree

3 files changed

+26
-24
lines changed

3 files changed

+26
-24
lines changed

setup.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[metadata]
22
name = picklescan
3-
version = 0.0.23
3+
version = 0.0.24
44
author = Matthieu Maitre
55
author_email = mmaitre314@users.noreply.github.com
66
description = Security scanner detecting Python Pickle files performing suspicious actions

src/picklescan/scanner.py

+25-23
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ def __str__(self) -> str:
160160
_pytorch_file_extensions = {".bin", ".pt", ".pth", ".ckpt"}
161161
_pickle_file_extensions = {".pkl", ".pickle", ".joblib", ".dat", ".data"}
162162
_zip_file_extensions = {".zip", ".npz", ".7z"}
163+
# Pickle files do not actually have magic bytes, but v2+ files
164+
# start with a PROTO (\x80) opcode followed by a byte with the protocol version
163165
_pickle_magic_bytes = {
164166
b"\x80\x00",
165167
b"\x80\x01",
@@ -168,6 +170,7 @@ def __str__(self) -> str:
168170
b"\x80\x04",
169171
b"\x80\x05",
170172
}
173+
_numpy_magic_bytes = b"\x93NUMPY"
171174

172175

173176
def _is_7z_file(f: IO[bytes]) -> bool:
@@ -364,37 +367,36 @@ def scan_7z_bytes(data: IO[bytes], file_id) -> ScanResult:
364367
return result
365368

366369

367-
def get_magic_bytes_from_zipfile(zip: zipfile.ZipFile, num_bytes=8):
368-
magic_bytes = {}
369-
for file_info in zip.infolist():
370-
with zip.open(file_info.filename) as f:
371-
magic_bytes[file_info.filename] = f.read(num_bytes)
372-
373-
return magic_bytes
374-
375-
376370
def scan_zip_bytes(data: IO[bytes], file_id) -> ScanResult:
377371
result = ScanResult([])
378372

379373
with RelaxedZipFile(data, "r") as zip:
380-
magic_bytes = get_magic_bytes_from_zipfile(zip)
381374
file_names = zip.namelist()
382375
_log.debug("Files in zip archive %s: %s", file_id, file_names)
383376
for file_name in file_names:
384-
magic_number = magic_bytes.get(file_name, b"")
385-
file_ext = os.path.splitext(file_name)[1]
386-
if file_ext in _pickle_file_extensions or any(
387-
magic_number.startswith(mn) for mn in _pickle_magic_bytes
388-
):
389-
_log.debug("Scanning file %s in zip archive %s", file_name, file_id)
390-
with zip.open(file_name, "r") as file:
391-
result.merge(scan_pickle_bytes(file, f"{file_id}:{file_name}"))
392-
elif file_ext in _numpy_file_extensions or magic_number.startswith(
393-
b"\x93NUMPY"
394-
):
395-
_log.debug("Scanning file %s in zip archive %s", file_name, file_id)
377+
try:
396378
with zip.open(file_name, "r") as file:
397-
result.merge(scan_numpy(file, f"{file_id}:{file_name}"))
379+
magic_bytes = file.read(8)
380+
file_ext = os.path.splitext(file_name)[1]
381+
382+
if file_ext in _pickle_file_extensions or any(
383+
magic_bytes.startswith(mn) for mn in _pickle_magic_bytes
384+
):
385+
_log.debug("Scanning file %s in zip archive %s", file_name, file_id)
386+
with zip.open(file_name, "r") as file:
387+
result.merge(scan_pickle_bytes(file, f"{file_id}:{file_name}"))
388+
389+
elif file_ext in _numpy_file_extensions or magic_bytes.startswith(
390+
_numpy_magic_bytes
391+
):
392+
_log.debug("Scanning file %s in zip archive %s", file_name, file_id)
393+
with zip.open(file_name, "r") as file:
394+
result.merge(scan_numpy(file, f"{file_id}:{file_name}"))
395+
except (zipfile.BadZipFile, RuntimeError) as e:
396+
# Log decompression issues (password protected, corrupted, etc.)
397+
_log.warning(
398+
"Invalid file %s in zip archive %s: %s", file_name, file_id, str(e)
399+
)
398400

399401
return result
400402

177 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)