@@ -154,6 +154,7 @@ def __str__(self) -> str:
154
154
_pytorch_file_extensions = {".bin" , ".pt" , ".pth" , ".ckpt" }
155
155
_pickle_file_extensions = {".pkl" , ".pickle" , ".joblib" , ".dat" , ".data" }
156
156
_zip_file_extensions = {".zip" , ".npz" , ".7z" }
157
+ _pickle_magic_bytes = {b"\x80 \x00 " , b"\x80 \x01 " , b"\x80 \x02 " , b"\x80 \x03 " , b"\x80 \x04 " , b"\x80 \x05 " }
157
158
158
159
159
160
def _is_7z_file (f : IO [bytes ]) -> bool :
@@ -349,20 +350,30 @@ def scan_7z_bytes(data: IO[bytes], file_id) -> ScanResult:
349
350
350
351
return result
351
352
353
+ def get_magic_bytes_from_zipfile (zip : zipfile .ZipFile , num_bytes = 8 ):
354
+ magic_bytes = {}
355
+ for file_info in zip .infolist ():
356
+ with zip .open (file_info .filename ) as f :
357
+ magic_bytes [file_info .filename ] = f .read (num_bytes )
358
+
359
+ return magic_bytes
360
+
352
361
353
362
def scan_zip_bytes (data : IO [bytes ], file_id ) -> ScanResult :
354
363
result = ScanResult ([])
355
364
356
365
with zipfile .ZipFile (data , "r" ) as zip :
366
+ magic_bytes = get_magic_bytes_from_zipfile (zip )
357
367
file_names = zip .namelist ()
358
368
_log .debug ("Files in zip archive %s: %s" , file_id , file_names )
359
369
for file_name in file_names :
370
+ magic_number = magic_bytes .get (file_name , b"" )
360
371
file_ext = os .path .splitext (file_name )[1 ]
361
- if file_ext in _pickle_file_extensions :
372
+ if file_ext in _pickle_file_extensions or any ( magic_number . startswith ( mn ) for mn in _pickle_magic_bytes ) :
362
373
_log .debug ("Scanning file %s in zip archive %s" , file_name , file_id )
363
374
with zip .open (file_name , "r" ) as file :
364
375
result .merge (scan_pickle_bytes (file , f"{ file_id } :{ file_name } " ))
365
- elif file_ext in _numpy_file_extensions :
376
+ elif file_ext in _numpy_file_extensions or magic_number . startswith ( b" \x93 NUMPY" ) :
366
377
_log .debug ("Scanning file %s in zip archive %s" , file_name , file_id )
367
378
with zip .open (file_name , "r" ) as file :
368
379
result .merge (scan_numpy (file , f"{ file_id } :{ file_name } " ))
0 commit comments