24
24
25
25
26
26
def read (address : int , length : int , * , align : int = 1 ):
27
- assert address % align == 0 , f' address: 0x{ address :07x} , align: { align } '
28
- return (address , length , ' System Bus' )
27
+ assert address % align == 0 , f" address: 0x{ address :07x} , align: { align } "
28
+ return (address , length , " System Bus" )
29
29
30
30
def read8 (address : int ):
31
31
return read (address , 1 )
@@ -38,32 +38,32 @@ def read32(address: int):
38
38
39
39
40
40
def write (address : int , value : bytes , * , align : int = 1 ):
41
- assert address % align == 0 , f' address: 0x{ address :07x} , align: { align } '
42
- return (address , value , ' System Bus' )
41
+ assert address % align == 0 , f" address: 0x{ address :07x} , align: { align } "
42
+ return (address , value , " System Bus" )
43
43
44
44
def write8 (address : int , value : int ):
45
- return write (address , value .to_bytes (1 , ' little' ))
45
+ return write (address , value .to_bytes (1 , " little" ))
46
46
47
47
def write16 (address : int , value : int ):
48
- return write (address , value .to_bytes (2 , ' little' ), align = 2 )
48
+ return write (address , value .to_bytes (2 , " little" ), align = 2 )
49
49
50
50
def write32 (address : int , value : int ):
51
- return write (address , value .to_bytes (4 , ' little' ), align = 4 )
51
+ return write (address , value .to_bytes (4 , " little" ), align = 4 )
52
52
53
53
54
54
guard8 = write8
55
55
guard16 = write16
56
56
57
57
58
58
def next_int (iterator : Iterator [bytes ]) -> int :
59
- return int .from_bytes (next (iterator ), ' little' )
59
+ return int .from_bytes (next (iterator ), " little" )
60
60
61
61
62
62
# itertools.batched from Python 3.12
63
63
# https://docs.python.org/3.11/library/itertools.html#itertools-recipes
64
64
def batched (iterable , n ):
65
65
if n < 1 :
66
- raise ValueError (' n must be at least 1' )
66
+ raise ValueError (" n must be at least 1" )
67
67
it = iter (iterable )
68
68
while batch := tuple (itertools .islice (it , n )):
69
69
yield batch
@@ -105,13 +105,13 @@ def batched(iterable, n):
105
105
106
106
107
107
def cmd_deathlink (self ):
108
- ''' Toggle death link from client. Overrides default setting.'''
108
+ """ Toggle death link from client. Overrides default setting."""
109
109
110
110
client_handler = self .ctx .client_handler
111
111
client_handler .death_link .enabled = not client_handler .death_link .enabled
112
112
Utils .async_start (
113
113
self .ctx .update_death_link (client_handler .death_link .enabled ),
114
- name = ' Update Death Link'
114
+ name = " Update Death Link"
115
115
)
116
116
117
117
@@ -122,10 +122,10 @@ class DeathLinkCtx:
122
122
sent_this_death : bool = False
123
123
124
124
def __repr__ (self ):
125
- return (f' { type (self )} {{ enabled: { self .enabled } , '
126
- f' update_pending: { self .update_pending } , '
127
- f' pending: { self .pending } , '
128
- f' sent_this_death: { self .sent_this_death } }}' )
125
+ return (f" { type (self )} {{ enabled: { self .enabled } , "
126
+ f" update_pending: { self .update_pending } , "
127
+ f" pending: { self .pending } , "
128
+ f" sent_this_death: { self .sent_this_death } }}" )
129
129
130
130
def __str__ (self ):
131
131
return repr (self )
@@ -149,7 +149,7 @@ class ZMConstants:
149
149
gGameModeSub1 = get_symbol ("gGameModeSub1" )
150
150
gPreventMovementTimer = get_symbol ("gPreventMovementTimer" )
151
151
gEquipment = get_symbol ("gEquipment" )
152
- gEventsTriggered = get_symbol ("gEventsTriggered" );
152
+ gEventsTriggered = get_symbol ("gEventsTriggered" )
153
153
gRandoLocationBitfields = get_symbol ("gRandoLocationBitfields" )
154
154
gIncomingItemId = get_symbol ("gIncomingItemId" )
155
155
gMultiworldItemCount = get_symbol ("gMultiworldItemCount" )
@@ -187,16 +187,16 @@ async def validate_rom(self, client_ctx: BizHawkClientContext) -> bool:
187
187
return False # Should verify on the next pass
188
188
189
189
game_name = next (read_result ).decode ("ascii" )
190
- slot_name_bytes = next (read_result ).rstrip (b' \0 ' )
191
- seed_name_bytes = next (read_result ).rstrip (b' \0 ' )
190
+ slot_name_bytes = next (read_result ).rstrip (b" \0 " )
191
+ seed_name_bytes = next (read_result ).rstrip (b" \0 " )
192
192
193
193
if game_name != "ZEROMISSIONE" :
194
194
return False
195
195
196
196
# Check if we can read the slot name. Doing this here instead of set_auth as a protection against
197
197
# validating a ROM where there's no slot name to read.
198
198
try :
199
- self .rom_slot_name = slot_name_bytes .decode (' utf-8' )
199
+ self .rom_slot_name = slot_name_bytes .decode (" utf-8" )
200
200
except UnicodeDecodeError :
201
201
logger .info ("Could not read slot name from ROM. Are you sure this ROM matches this client version?" )
202
202
return False
@@ -205,12 +205,12 @@ async def validate_rom(self, client_ctx: BizHawkClientContext) -> bool:
205
205
client_ctx .items_handling = 0b001
206
206
client_ctx .want_slot_data = True
207
207
try :
208
- client_ctx .seed_name = seed_name_bytes .decode (' utf-8' )
208
+ client_ctx .seed_name = seed_name_bytes .decode (" utf-8" )
209
209
except UnicodeDecodeError :
210
- logger .info (' Could not determine seed name from ROM. Are you sure this ROM matches this client version?' )
210
+ logger .info (" Could not determine seed name from ROM. Are you sure this ROM matches this client version?" )
211
211
return False
212
212
213
- client_ctx .command_processor .commands [' deathlink' ] = cmd_deathlink
213
+ client_ctx .command_processor .commands [" deathlink" ] = cmd_deathlink
214
214
self .death_link = DeathLinkCtx ()
215
215
216
216
self .dc_pending = False
@@ -305,8 +305,8 @@ async def game_watcher(self, client_ctx: BizHawkClientContext) -> None:
305
305
306
306
if gMainGameMode in (ZMConstants .GM_CHOZODIA_ESCAPE , ZMConstants .GM_CREDITS ) and not client_ctx .finished_game :
307
307
await client_ctx .send_msgs ([{
308
- ' cmd' : ' StatusUpdate' ,
309
- ' status' : ClientStatus .CLIENT_GOAL
308
+ " cmd" : " StatusUpdate" ,
309
+ " status" : ClientStatus .CLIENT_GOAL
310
310
}])
311
311
312
312
if self .local_set_events != set_events and client_ctx .slot is not None :
@@ -315,11 +315,11 @@ async def game_watcher(self, client_ctx: BizHawkClientContext) -> None:
315
315
if set_events [flag ]:
316
316
event_bitfield |= 1 << i
317
317
await client_ctx .send_msgs ([{
318
- ' cmd' : ' Set' ,
319
- ' key' : f' mzm_events_{ client_ctx .team } _{ client_ctx .slot } ' ,
320
- ' default' : 0 ,
321
- ' want_reply' : False ,
322
- ' operations' : [{' operation' : 'or' , ' value' : event_bitfield }]
318
+ " cmd" : " Set" ,
319
+ " key" : f" mzm_events_{ client_ctx .team } _{ client_ctx .slot } " ,
320
+ " default" : 0 ,
321
+ " want_reply" : False ,
322
+ " operations" : [{" operation" : "or" , " value" : event_bitfield }]
323
323
}])
324
324
self .local_set_events = set_events
325
325
@@ -370,15 +370,15 @@ async def game_watcher(self, client_ctx: BizHawkClientContext) -> None:
370
370
return
371
371
372
372
def on_package (self , ctx : BizHawkClientContext , cmd : str , args : dict ) -> None :
373
- if cmd == ' Connected' :
374
- if args [' slot_data' ].get (' death_link' ):
373
+ if cmd == " Connected" :
374
+ if args [" slot_data" ].get (" death_link" ):
375
375
self .death_link .enabled = True
376
376
self .death_link .update_pending = True
377
- if cmd == ' RoomInfo' :
378
- if ctx .seed_name and ctx .seed_name != args [' seed_name' ]:
377
+ if cmd == " RoomInfo" :
378
+ if ctx .seed_name and ctx .seed_name != args [" seed_name" ]:
379
379
# CommonClient's on_package displays an error to the user in this case, but connection is not cancelled.
380
380
self .dc_pending = True
381
- if cmd == ' Bounced' :
382
- tags = args .get (' tags' , [])
383
- if ' DeathLink' in tags and args [' data' ][ ' source' ] != ctx .auth :
381
+ if cmd == " Bounced" :
382
+ tags = args .get (" tags" , [])
383
+ if " DeathLink" in tags and args [" data" ][ " source" ] != ctx .auth :
384
384
self .death_link .pending = True
0 commit comments