5
5
import time
6
6
import numpy as np
7
7
import bufr
8
- from pyioda .ioda .Engines .Bufr import Encoder as iodaEncoder
9
- from bufr .encoders .netcdf import Encoder as netcdfEncoder
8
+ from pyioda .ioda .Engines .Bufr import Encoder as iodaEncoder
9
+ from bufr .encoders .netcdf import Encoder as netcdfEncoder
10
10
from wxflow import Logger
11
11
12
12
# Initialize Logger
13
13
# Get log level from the environment variable, default to 'INFO it not set
14
14
log_level = os .getenv ('LOG_LEVEL' , 'INFO' )
15
15
logger = Logger ('BUFR2IODA_satwnd_amv_goes.py' , level = log_level , colored_log = False )
16
16
17
+
17
18
def logging (comm , level , message ):
18
19
"""
19
20
Logs a message to the console or log file, based on the specified logging level.
20
21
21
- This function ensures that logging is only performed by the root process (`rank 0`)
22
- in a distributed computing environment. The function maps the logging level to
22
+ This function ensures that logging is only performed by the root process (`rank 0`)
23
+ in a distributed computing environment. The function maps the logging level to
23
24
appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided.
24
25
25
26
Parameters:
26
27
comm: object
27
- The communicator object, typically from a distributed computing framework
28
+ The communicator object, typically from a distributed computing framework
28
29
(e.g., MPI). It must have a `rank()` method to determine the process rank.
29
30
level: str
30
31
The logging level as a string. Supported levels are:
@@ -33,7 +34,7 @@ def logging(comm, level, message):
33
34
- 'WARNING'
34
35
- 'ERROR'
35
36
- 'CRITICAL'
36
- If an invalid level is provided, a warning will be logged, and the level
37
+ If an invalid level is provided, a warning will be logged, and the level
37
38
will default to 'INFO'.
38
39
message: str
39
40
The message to be logged.
@@ -73,6 +74,7 @@ def logging(comm, level, message):
73
74
# Call the logging method
74
75
log_method (message )
75
76
77
+
76
78
def _make_description (mapping_path , update = False ):
77
79
description = bufr .encoders .Description (mapping_path )
78
80
@@ -116,6 +118,7 @@ def _make_description(mapping_path, update=False):
116
118
117
119
return description
118
120
121
+
119
122
def compute_wind_components (wdir , wspd ):
120
123
"""
121
124
Compute the U and V wind components from wind direction and wind speed.
@@ -130,9 +133,10 @@ def compute_wind_components(wdir, wspd):
130
133
wdir_rad = np .radians (wdir ) # Convert degrees to radians
131
134
u = - wspd * np .sin (wdir_rad )
132
135
v = - wspd * np .cos (wdir_rad )
133
-
136
+
134
137
return u .astype (np .float32 ), v .astype (np .float32 )
135
138
139
+
136
140
def _get_obs_type (swcm , chanfreq ):
137
141
"""
138
142
Determine the observation type based on `swcm` and `chanfreq`.
@@ -164,6 +168,7 @@ def _get_obs_type(swcm, chanfreq):
164
168
165
169
return obstype
166
170
171
+
167
172
def _make_obs (comm , input_path , mapping_path ):
168
173
169
174
# Get container from mapping file first
@@ -175,7 +180,7 @@ def _make_obs(comm, input_path, mapping_path):
175
180
logging (comm , 'DEBUG' , f'category map = { container .get_category_map ()} ' )
176
181
177
182
# Add new/derived data into container
178
- for cat in container .all_sub_categories ():
183
+ for cat in container .all_sub_categories ():
179
184
180
185
logging (comm , 'DEBUG' , f'category = { cat } ' )
181
186
@@ -193,7 +198,7 @@ def _make_obs(comm, input_path, mapping_path):
193
198
container .add ('variables/windNorthward' , wob , paths , cat )
194
199
195
200
else :
196
- # Add new variables: ObsType/windEastward & ObsType/windNorthward
201
+ # Add new variables: ObsType/windEastward & ObsType/windNorthward
197
202
swcm = container .get ('variables/windComputationMethod' , cat )
198
203
chanfreq = container .get ('variables/sensorCentralFrequency' , cat )
199
204
@@ -209,7 +214,7 @@ def _make_obs(comm, input_path, mapping_path):
209
214
container .add ('variables/obstype_uwind' , obstype , paths , cat )
210
215
container .add ('variables/obstype_vwind' , obstype , paths , cat )
211
216
212
- # Add new variables: ObsValue/windEastward & ObsValue/windNorthward
217
+ # Add new variables: ObsValue/windEastward & ObsValue/windNorthward
213
218
wdir = container .get ('variables/windDirection' , cat )
214
219
wspd = container .get ('variables/windSpeed' , cat )
215
220
@@ -231,6 +236,7 @@ def _make_obs(comm, input_path, mapping_path):
231
236
232
237
return container
233
238
239
+
234
240
def create_obs_group (input_path , mapping_path , category , env ):
235
241
236
242
comm = bufr .mpi .Comm (env ["comm_name" ])
@@ -250,7 +256,7 @@ def create_obs_group(input_path, mapping_path, category, env):
250
256
251
257
container = _make_obs (comm , input_path , mapping_path )
252
258
253
- # Gather data from all tasks into all tasks. Each task will have the complete record
259
+ # Gather data from all tasks into all tasks. Each task will have the complete record
254
260
logging (comm , 'INFO' , f'Gather data from all tasks into all tasks' )
255
261
container .all_gather (comm )
256
262
@@ -269,6 +275,7 @@ def create_obs_group(input_path, mapping_path, category, env):
269
275
logging (comm , 'INFO' , f'Return the encoded data for { category } ' )
270
276
return data
271
277
278
+
272
279
def create_obs_file (input_path , mapping_path , output_path ):
273
280
274
281
comm = bufr .mpi .Comm ("world" )
@@ -279,7 +286,7 @@ def create_obs_file(input_path, mapping_path, output_path):
279
286
280
287
# Encode the data
281
288
if comm .rank () == 0 :
282
- netcdfEncoder (description ).encode (container , output_path )
289
+ netcdfEncoder (description ).encode (container , output_path )
283
290
284
291
logging (comm , 'INFO' , f'Return the encoded data' )
285
292
0 commit comments