3
3
import abc
4
4
import json
5
5
import zipfile
6
+ from enum import IntEnum
6
7
import os
7
8
import threading
8
9
9
- from typing import ClassVar , Dict , List , Literal , Tuple , Any , Optional , Union , BinaryIO
10
+ from typing import ClassVar , Dict , List , Literal , Tuple , Any , Optional , Union , BinaryIO , overload , Sequence
10
11
11
12
import bsdiff4
12
13
@@ -38,6 +39,34 @@ def get_handler(file: str) -> Optional[AutoPatchRegister]:
38
39
return None
39
40
40
41
42
+ class AutoPatchExtensionRegister (abc .ABCMeta ):
43
+ extension_types : ClassVar [Dict [str , AutoPatchExtensionRegister ]] = {}
44
+ required_extensions : Tuple [str , ...] = ()
45
+
46
+ def __new__ (mcs , name : str , bases : Tuple [type , ...], dct : Dict [str , Any ]) -> AutoPatchExtensionRegister :
47
+ # construct class
48
+ new_class = super ().__new__ (mcs , name , bases , dct )
49
+ if "game" in dct :
50
+ AutoPatchExtensionRegister .extension_types [dct ["game" ]] = new_class
51
+ return new_class
52
+
53
+ @staticmethod
54
+ def get_handler (game : Optional [str ]) -> Union [AutoPatchExtensionRegister , List [AutoPatchExtensionRegister ]]:
55
+ if not game :
56
+ return APPatchExtension
57
+ handler = AutoPatchExtensionRegister .extension_types .get (game , APPatchExtension )
58
+ if handler .required_extensions :
59
+ handlers = [handler ]
60
+ for required in handler .required_extensions :
61
+ ext = AutoPatchExtensionRegister .extension_types .get (required )
62
+ if not ext :
63
+ raise NotImplementedError (f"No handler for { required } ." )
64
+ handlers .append (ext )
65
+ return handlers
66
+ else :
67
+ return handler
68
+
69
+
41
70
container_version : int = 6
42
71
43
72
@@ -157,27 +186,14 @@ def patch(self, target: str) -> None:
157
186
""" create the output file with the file name `target` """
158
187
159
188
160
- class APDeltaPatch (APAutoPatchInterface ):
161
- """An implementation of `APAutoPatchInterface` that additionally
162
- has delta.bsdiff4 containing a delta patch to get the desired file."""
163
-
189
+ class APProcedurePatch (APAutoPatchInterface ):
190
+ """
191
+ An APPatch that defines a procedure to produce the desired file.
192
+ """
164
193
hash : Optional [str ] # base checksum of source file
165
- patch_file_ending : str = ""
166
- delta : Optional [bytes ] = None
167
194
source_data : bytes
168
- procedure = None # delete this line when APPP is added
169
-
170
- def __init__ (self , * args : Any , patched_path : str = "" , ** kwargs : Any ) -> None :
171
- self .patched_path = patched_path
172
- super (APDeltaPatch , self ).__init__ (* args , ** kwargs )
173
-
174
- def get_manifest (self ) -> Dict [str , Any ]:
175
- manifest = super (APDeltaPatch , self ).get_manifest ()
176
- manifest ["base_checksum" ] = self .hash
177
- manifest ["result_file_ending" ] = self .result_file_ending
178
- manifest ["patch_file_ending" ] = self .patch_file_ending
179
- manifest ["compatible_version" ] = 5 # delete this line when APPP is added
180
- return manifest
195
+ patch_file_ending : str = ""
196
+ files : Dict [str , bytes ]
181
197
182
198
@classmethod
183
199
def get_source_data (cls ) -> bytes :
@@ -190,21 +206,223 @@ def get_source_data_with_cache(cls) -> bytes:
190
206
cls .source_data = cls .get_source_data ()
191
207
return cls .source_data
192
208
193
- def write_contents (self , opened_zipfile : zipfile .ZipFile ):
194
- super (APDeltaPatch , self ).write_contents (opened_zipfile )
195
- # write Delta
196
- opened_zipfile .writestr ("delta.bsdiff4" ,
197
- bsdiff4 .diff (self .get_source_data_with_cache (), open (self .patched_path , "rb" ).read ()),
198
- compress_type = zipfile .ZIP_STORED ) # bsdiff4 is a format with integrated compression
199
-
200
- def read_contents (self , opened_zipfile : zipfile .ZipFile ):
201
- super (APDeltaPatch , self ).read_contents (opened_zipfile )
202
- self .delta = opened_zipfile .read ("delta.bsdiff4" )
203
-
204
- def patch (self , target : str ):
205
- """Base + Delta -> Patched"""
206
- if not self .delta :
209
+ def __init__ (self , * args : Any , ** kwargs : Any ):
210
+ super (APProcedurePatch , self ).__init__ (* args , ** kwargs )
211
+ self .files = {}
212
+
213
+ def get_manifest (self ) -> Dict [str , Any ]:
214
+ manifest = super (APProcedurePatch , self ).get_manifest ()
215
+ manifest ["base_checksum" ] = self .hash
216
+ manifest ["result_file_ending" ] = self .result_file_ending
217
+ manifest ["patch_file_ending" ] = self .patch_file_ending
218
+ manifest ["procedure" ] = self .procedure
219
+ if self .procedure == APDeltaPatch .procedure :
220
+ manifest ["compatible_version" ] = 5
221
+ return manifest
222
+
223
+ def read_contents (self , opened_zipfile : zipfile .ZipFile ) -> None :
224
+ super (APProcedurePatch , self ).read_contents (opened_zipfile )
225
+ with opened_zipfile .open ("archipelago.json" , "r" ) as f :
226
+ manifest = json .load (f )
227
+ if "procedure" not in manifest :
228
+ # support patching files made before moving to procedures
229
+ self .procedure = [("apply_bsdiff4" , ["delta.bsdiff4" ])]
230
+ else :
231
+ self .procedure = manifest ["procedure" ]
232
+ for file in opened_zipfile .namelist ():
233
+ if file not in ["archipelago.json" ]:
234
+ self .files [file ] = opened_zipfile .read (file )
235
+
236
+ def write_contents (self , opened_zipfile : zipfile .ZipFile ) -> None :
237
+ super (APProcedurePatch , self ).write_contents (opened_zipfile )
238
+ for file in self .files :
239
+ opened_zipfile .writestr (file , self .files [file ],
240
+ compress_type = zipfile .ZIP_STORED if file .endswith (".bsdiff4" ) else None )
241
+
242
+ def get_file (self , file : str ) -> bytes :
243
+ """ Retrieves a file from the patch container."""
244
+ if file not in self .files :
207
245
self .read ()
208
- result = bsdiff4 .patch (self .get_source_data_with_cache (), self .delta )
209
- with open (target , "wb" ) as f :
210
- f .write (result )
246
+ return self .files [file ]
247
+
248
+ def write_file (self , file_name : str , file : bytes ) -> None :
249
+ """ Writes a file to the patch container, to be retrieved upon patching. """
250
+ self .files [file_name ] = file
251
+
252
+ def patch (self , target : str ) -> None :
253
+ self .read ()
254
+ base_data = self .get_source_data_with_cache ()
255
+ patch_extender = AutoPatchExtensionRegister .get_handler (self .game )
256
+ assert not isinstance (self .procedure , str ), f"{ type (self )} must define procedures"
257
+ for step , args in self .procedure :
258
+ if isinstance (patch_extender , list ):
259
+ extension = next ((item for item in [getattr (extender , step , None ) for extender in patch_extender ]
260
+ if item is not None ), None )
261
+ else :
262
+ extension = getattr (patch_extender , step , None )
263
+ if extension is not None :
264
+ base_data = extension (self , base_data , * args )
265
+ else :
266
+ raise NotImplementedError (f"Unknown procedure { step } for { self .game } ." )
267
+ with open (target , 'wb' ) as f :
268
+ f .write (base_data )
269
+
270
+
271
+ class APDeltaPatch (APProcedurePatch ):
272
+ """An APProcedurePatch that additionally has delta.bsdiff4
273
+ containing a delta patch to get the desired file, often a rom."""
274
+
275
+ procedure = [
276
+ ("apply_bsdiff4" , ["delta.bsdiff4" ])
277
+ ]
278
+
279
+ def __init__ (self , * args : Any , patched_path : str = "" , ** kwargs : Any ) -> None :
280
+ super (APDeltaPatch , self ).__init__ (* args , ** kwargs )
281
+ self .patched_path = patched_path
282
+
283
+ def write_contents (self , opened_zipfile : zipfile .ZipFile ) -> None :
284
+ self .write_file ("delta.bsdiff4" ,
285
+ bsdiff4 .diff (self .get_source_data_with_cache (), open (self .patched_path , "rb" ).read ()))
286
+ super (APDeltaPatch , self ).write_contents (opened_zipfile )
287
+
288
+
289
+ class APTokenTypes (IntEnum ):
290
+ WRITE = 0
291
+ COPY = 1
292
+ RLE = 2
293
+ AND_8 = 3
294
+ OR_8 = 4
295
+ XOR_8 = 5
296
+
297
+
298
+ class APTokenMixin :
299
+ """
300
+ A class that defines functions for generating a token binary, for use in patches.
301
+ """
302
+ _tokens : Sequence [
303
+ Tuple [APTokenTypes , int , Union [
304
+ bytes , # WRITE
305
+ Tuple [int , int ], # COPY, RLE
306
+ int # AND_8, OR_8, XOR_8
307
+ ]]] = ()
308
+
309
+ def get_token_binary (self ) -> bytes :
310
+ """
311
+ Returns the token binary created from stored tokens.
312
+ :return: A bytes object representing the token data.
313
+ """
314
+ data = bytearray ()
315
+ data .extend (len (self ._tokens ).to_bytes (4 , "little" ))
316
+ for token_type , offset , args in self ._tokens :
317
+ data .append (token_type )
318
+ data .extend (offset .to_bytes (4 , "little" ))
319
+ if token_type in [APTokenTypes .AND_8 , APTokenTypes .OR_8 , APTokenTypes .XOR_8 ]:
320
+ assert isinstance (args , int ), f"Arguments to AND/OR/XOR must be of type int, not { type (args )} "
321
+ data .extend (int .to_bytes (1 , 4 , "little" ))
322
+ data .append (args )
323
+ elif token_type in [APTokenTypes .COPY , APTokenTypes .RLE ]:
324
+ assert isinstance (args , tuple ), f"Arguments to COPY/RLE must be of type tuple, not { type (args )} "
325
+ data .extend (int .to_bytes (4 , 4 , "little" ))
326
+ data .extend (args [0 ].to_bytes (4 , "little" ))
327
+ data .extend (args [1 ].to_bytes (4 , "little" ))
328
+ elif token_type == APTokenTypes .WRITE :
329
+ assert isinstance (args , bytes ), f"Arguments to WRITE must be of type bytes, not { type (args )} "
330
+ data .extend (len (args ).to_bytes (4 , "little" ))
331
+ data .extend (args )
332
+ else :
333
+ raise ValueError (f"Unknown token type { token_type } " )
334
+ return bytes (data )
335
+
336
+ @overload
337
+ def write_token (self ,
338
+ token_type : Literal [APTokenTypes .AND_8 , APTokenTypes .OR_8 , APTokenTypes .XOR_8 ],
339
+ offset : int ,
340
+ data : int ) -> None :
341
+ ...
342
+
343
+ @overload
344
+ def write_token (self ,
345
+ token_type : Literal [APTokenTypes .COPY , APTokenTypes .RLE ],
346
+ offset : int ,
347
+ data : Tuple [int , int ]) -> None :
348
+ ...
349
+
350
+ @overload
351
+ def write_token (self ,
352
+ token_type : Literal [APTokenTypes .WRITE ],
353
+ offset : int ,
354
+ data : bytes ) -> None :
355
+ ...
356
+
357
+ def write_token (self , token_type : APTokenTypes , offset : int , data : Union [bytes , Tuple [int , int ], int ]) -> None :
358
+ """
359
+ Stores a token to be used by patching.
360
+ """
361
+ if not isinstance (self ._tokens , list ):
362
+ assert len (self ._tokens ) == 0 , f"{ type (self )} ._tokens was tampered with."
363
+ self ._tokens = []
364
+ self ._tokens .append ((token_type , offset , data ))
365
+
366
+
367
+ class APPatchExtension (metaclass = AutoPatchExtensionRegister ):
368
+ """Class that defines patch extension functions for a given game.
369
+ Patch extension functions must have the following two arguments in the following order:
370
+
371
+ caller: APProcedurePatch (used to retrieve files from the patch container)
372
+
373
+ rom: bytes (the data to patch)
374
+
375
+ Further arguments are passed in from the procedure as defined.
376
+
377
+ Patch extension functions must return the changed bytes.
378
+ """
379
+ game : str
380
+ required_extensions : ClassVar [Tuple [str , ...]] = ()
381
+
382
+ @staticmethod
383
+ def apply_bsdiff4 (caller : APProcedurePatch , rom : bytes , patch : str ) -> bytes :
384
+ """Applies the given bsdiff4 from the patch onto the current file."""
385
+ return bsdiff4 .patch (rom , caller .get_file (patch ))
386
+
387
+ @staticmethod
388
+ def apply_tokens (caller : APProcedurePatch , rom : bytes , token_file : str ) -> bytes :
389
+ """Applies the given token file from the patch onto the current file."""
390
+ token_data = caller .get_file (token_file )
391
+ rom_data = bytearray (rom )
392
+ token_count = int .from_bytes (token_data [0 :4 ], "little" )
393
+ bpr = 4
394
+ for _ in range (token_count ):
395
+ token_type = token_data [bpr :bpr + 1 ][0 ]
396
+ offset = int .from_bytes (token_data [bpr + 1 :bpr + 5 ], "little" )
397
+ size = int .from_bytes (token_data [bpr + 5 :bpr + 9 ], "little" )
398
+ data = token_data [bpr + 9 :bpr + 9 + size ]
399
+ if token_type in [APTokenTypes .AND_8 , APTokenTypes .OR_8 , APTokenTypes .XOR_8 ]:
400
+ arg = data [0 ]
401
+ if token_type == APTokenTypes .AND_8 :
402
+ rom_data [offset ] = rom_data [offset ] & arg
403
+ elif token_type == APTokenTypes .OR_8 :
404
+ rom_data [offset ] = rom_data [offset ] | arg
405
+ else :
406
+ rom_data [offset ] = rom_data [offset ] ^ arg
407
+ elif token_type in [APTokenTypes .COPY , APTokenTypes .RLE ]:
408
+ length = int .from_bytes (data [:4 ], "little" )
409
+ value = int .from_bytes (data [4 :], "little" )
410
+ if token_type == APTokenTypes .COPY :
411
+ rom_data [offset : offset + length ] = rom_data [value : value + length ]
412
+ else :
413
+ rom_data [offset : offset + length ] = bytes ([value ] * length )
414
+ else :
415
+ rom_data [offset :offset + len (data )] = data
416
+ bpr += 9 + size
417
+ return bytes (rom_data )
418
+
419
+ @staticmethod
420
+ def calc_snes_crc (caller : APProcedurePatch , rom : bytes ) -> bytes :
421
+ """Calculates and applies a valid CRC for the SNES rom header."""
422
+ rom_data = bytearray (rom )
423
+ if len (rom ) < 0x8000 :
424
+ raise Exception ("Tried to calculate SNES CRC on file too small to be a SNES ROM." )
425
+ crc = (sum (rom_data [:0x7FDC ] + rom_data [0x7FE0 :]) + 0x01FE ) & 0xFFFF
426
+ inv = crc ^ 0xFFFF
427
+ rom_data [0x7FDC :0x7FE0 ] = [inv & 0xFF , (inv >> 8 ) & 0xFF , crc & 0xFF , (crc >> 8 ) & 0xFF ]
428
+ return bytes (rom_data )
0 commit comments