6
6
"""
7
7
8
8
import datetime
9
- import struct
10
9
import platform
10
+ import struct
11
11
from typing import Optional
12
12
13
13
from radiacode .bytes_buffer import BytesBuffer
14
14
from radiacode .decoders .databuf import decode_VS_DATA_BUF
15
15
from radiacode .decoders .spectrum import decode_RC_VS_SPECTRUM
16
16
from radiacode .transports .bluetooth import Bluetooth
17
17
from radiacode .transports .usb import Usb
18
- from radiacode .types import CTRL , VS , VSFR , DisplayDirection , DoseRateDB , Event , RareData , RawData , RealTimeData , Spectrum
18
+ from radiacode .types import (
19
+ COMMAND ,
20
+ CTRL ,
21
+ VS ,
22
+ VSFR ,
23
+ DisplayDirection ,
24
+ DoseRateDB ,
25
+ Event ,
26
+ RareData ,
27
+ RawData ,
28
+ RealTimeData ,
29
+ Spectrum ,
30
+ )
19
31
20
32
21
33
def spectrum_channel_to_energy (channel_number : int , a0 : float , a1 : float , a2 : float ) -> float :
@@ -76,7 +88,7 @@ def __init__(
76
88
self ._connection = Usb (serial_number = serial_number )
77
89
78
90
# init
79
- self .execute (b' \x07 \x00 ' , b'\x01 \xff \x12 \xff ' )
91
+ self .execute (COMMAND . SET_EXCHANGE , b'\x01 \xff \x12 \xff ' )
80
92
self .set_local_time (datetime .datetime .now ())
81
93
self .device_time (0 )
82
94
self ._base_time = datetime .datetime .now () + datetime .timedelta (seconds = 128 )
@@ -96,12 +108,11 @@ def __init__(
96
108
def base_time (self ) -> datetime .datetime :
97
109
return self ._base_time
98
110
99
- def execute (self , reqtype : bytes , args : Optional [bytes ] = None ) -> BytesBuffer :
100
- assert len (reqtype ) == 2
111
+ def execute (self , reqtype : COMMAND , args : Optional [bytes ] = None ) -> BytesBuffer :
101
112
req_seq_no = 0x80 + self ._seq
102
113
self ._seq = (self ._seq + 1 ) % 32
103
114
104
- req_header = reqtype + b' \x00 ' + struct .pack ('<B' , req_seq_no )
115
+ req_header = struct .pack ('<HBB' , int ( reqtype ), 0 , req_seq_no )
105
116
request = req_header + (args or b'' )
106
117
full_request = struct .pack ('<I' , len (request )) + request
107
118
@@ -111,7 +122,7 @@ def execute(self, reqtype: bytes, args: Optional[bytes] = None) -> BytesBuffer:
111
122
return response
112
123
113
124
def read_request (self , command_id : int | VS | VSFR ) -> BytesBuffer :
114
- r = self .execute (b' \x26 \x08 ' , struct .pack ('<I' , int (command_id )))
125
+ r = self .execute (COMMAND . RD_VIRT_STRING , struct .pack ('<I' , int (command_id )))
115
126
retcode , flen = r .unpack ('<II' )
116
127
assert retcode == 1 , f'{ command_id } : got retcode { retcode } '
117
128
# HACK: workaround for new firmware bug(?)
@@ -122,20 +133,20 @@ def read_request(self, command_id: int | VS | VSFR) -> BytesBuffer:
122
133
return r
123
134
124
135
def write_request (self , command_id : int | VSFR , data : Optional [bytes ] = None ) -> None :
125
- r = self .execute (b' \x25 \x08 ' , struct .pack ('<I' , int (command_id )) + (data or b'' ))
136
+ r = self .execute (COMMAND . WR_VIRT_SFR , struct .pack ('<I' , int (command_id )) + (data or b'' ))
126
137
retcode = r .unpack ('<I' )[0 ]
127
138
assert retcode == 1
128
139
assert r .size () == 0
129
140
130
141
def batch_read_vsfrs (self , vsfr_ids : list [VSFR ]) -> list [int ]:
131
142
assert len (vsfr_ids )
132
- r = self .execute (b' \x2a \x08 ' , b'' .join (struct .pack ('<I' , int (c )) for c in vsfr_ids ))
143
+ r = self .execute (COMMAND . RD_VIRT_SFR_BATCH , b'' .join (struct .pack ('<I' , int (c )) for c in vsfr_ids ))
133
144
ret = [r .unpack ('<I' )[0 ] for _ in range (len (vsfr_ids ))]
134
145
assert r .size () == 0
135
146
return ret
136
147
137
148
def status (self ) -> str :
138
- r = self .execute (b' \x05 \x00 ' )
149
+ r = self .execute (COMMAND . GET_STATUS )
139
150
flags = r .unpack ('<I' )
140
151
assert r .size () == 0
141
152
return f'status flags: { flags } '
@@ -149,10 +160,10 @@ def set_local_time(self, dt: datetime.datetime) -> None:
149
160
Microseconds are ignored.
150
161
"""
151
162
d = struct .pack ('<BBBBBBBB' , dt .day , dt .month , dt .year - 2000 , 0 , dt .second , dt .minute , dt .hour , 0 )
152
- self .execute (b' \x04 \x0a ' , d )
163
+ self .execute (COMMAND . SET_TIME , d )
153
164
154
165
def fw_signature (self ) -> str :
155
- r = self .execute (b' \x01 \x01 ' )
166
+ r = self .execute (COMMAND . FW_SIGNATURE )
156
167
signature = r .unpack ('<I' )[0 ]
157
168
filename = r .unpack_string ()
158
169
idstring = r .unpack_string ()
@@ -166,7 +177,7 @@ def fw_version(self) -> tuple[tuple[int, int, str], tuple[int, int, str]]:
166
177
- Boot version: (major, minor, date string)
167
178
- Target version: (major, minor, date string)
168
179
"""
169
- r = self .execute (b' \x0a \x00 ' )
180
+ r = self .execute (COMMAND . GET_VERSION )
170
181
boot_minor , boot_major = r .unpack ('<HH' )
171
182
boot_date = r .unpack_string ()
172
183
target_minor , target_major = r .unpack ('<HH' )
@@ -181,7 +192,7 @@ def hw_serial_number(self) -> str:
181
192
str: Hardware serial number formatted as hyphen-separated hexadecimal groups
182
193
(e.g. "12345678-9ABCDEF0")
183
194
"""
184
- r = self .execute (b' \x0b \x00 ' )
195
+ r = self .execute (COMMAND . GET_SERIAL )
185
196
serial_len = r .unpack ('<I' )[0 ]
186
197
assert serial_len % 4 == 0
187
198
serial_groups = [r .unpack ('<I' )[0 ] for _ in range (serial_len // 4 )]
@@ -202,11 +213,11 @@ def serial_number(self) -> str:
202
213
Returns:
203
214
str: The device serial number as an ASCII string
204
215
"""
205
- r = self .read_request (8 )
216
+ r = self .read_request (VS . SERIAL_NUMBER )
206
217
return r .data ().decode ('ascii' )
207
218
208
219
def commands (self ) -> str :
209
- br = self .read_request (257 )
220
+ br = self .read_request (VS . SFR_FILE )
210
221
return br .data ().decode ('ascii' )
211
222
212
223
# called with 0 after init!
@@ -254,7 +265,7 @@ def spectrum_reset(self) -> None:
254
265
This clears the current spectrum data buffer, effectively resetting the spectrum
255
266
measurement to start fresh.
256
267
"""
257
- r = self .execute (b' \x27 \x08 ' , struct .pack ('<II' , int (VS .SPECTRUM ), 0 ))
268
+ r = self .execute (COMMAND . WR_VIRT_STRING , struct .pack ('<II' , int (VS .SPECTRUM ), 0 ))
258
269
retcode = r .unpack ('<I' )[0 ]
259
270
assert retcode == 1
260
271
assert r .size () == 0
@@ -282,7 +293,7 @@ def set_energy_calib(self, coef: list[float]) -> None:
282
293
"""
283
294
assert len (coef ) == 3
284
295
pc = struct .pack ('<fff' , * coef )
285
- r = self .execute (b' \x27 \x08 ' , struct .pack ('<II' , int (VS .ENERGY_CALIB ), len (pc )) + pc )
296
+ r = self .execute (COMMAND . WR_VIRT_STRING , struct .pack ('<II' , int (VS .ENERGY_CALIB ), len (pc )) + pc )
286
297
retcode = r .unpack ('<I' )[0 ]
287
298
assert retcode == 1
288
299
0 commit comments