|
| 1 | + |
| 2 | +from __future__ import print_function |
| 3 | + |
| 4 | +import numpy as np |
| 5 | + |
| 6 | +from netCDF4 import Dataset |
| 7 | + |
| 8 | +from datetime import datetime, timedelta |
| 9 | +import urllib2 as urlreq |
| 10 | +import re |
| 11 | +import os |
| 12 | +import glob |
| 13 | +import gzip |
| 14 | +from StringIO import StringIO |
| 15 | + |
| 16 | +from stdatmos import std_atmosphere_pres |
| 17 | + |
| 18 | +_epoch = datetime(1970, 1, 1, 0) |
| 19 | +_missing = -9999.0 |
| 20 | +_base_url = "https://madis-data.ncep.noaa.gov/madisPublic1/data/point/acars/netcdf/" |
| 21 | +_work_path = "/home/tsupinie/acars" |
| 22 | +_output_path = "/data/soundings/http/soundings/acars" |
| 23 | +_time_granularity = 600 # seconds |
| 24 | + |
| 25 | + |
| 26 | + |
| 27 | +def load_meta(meta_fname=("%s/airport_info.dat" % _work_path)): |
| 28 | + """ |
| 29 | + load_meta |
| 30 | +
|
| 31 | + Load in our database of airport codes. |
| 32 | + """ |
| 33 | + meta_cols = ['code', 'id', 'synop', 'lat', 'lon', 'elev', 'name'] |
| 34 | + meta_types = {'code': int, 'id': str, 'synop': int, 'lat': float, 'lon': float, 'elev': int, 'name': str} |
| 35 | + |
| 36 | + meta_airport = {} |
| 37 | + with open(meta_fname) as fmeta: |
| 38 | + for line in fmeta: |
| 39 | + line_dict = {col: val for col, val in zip(meta_cols, line.strip().split(None, 6)) } |
| 40 | + |
| 41 | + for col in line_dict.keys(): |
| 42 | + line_dict[col] = meta_types[col](line_dict[col]) |
| 43 | + |
| 44 | + code = line_dict.pop('code') |
| 45 | + meta_airport[code] = line_dict |
| 46 | + return meta_airport |
| 47 | + |
| 48 | + |
| 49 | +def get_times(marker_path=("%s/markers" % _work_path)): |
| 50 | + """ |
| 51 | + get_times |
| 52 | +
|
| 53 | + This function figures out what times need to be downloaded from the server. Profiles are in files by hour, and the |
| 54 | + files are updated as new data comes in, so if we download a file once, we won't necessarily get all the data. So |
| 55 | + this also keeps track of which files were last accessed when. It does this by touching a file on disk (in |
| 56 | + marker_path) when it last accessed a file. If the file on the server is newer, the function knows that the file |
| 57 | + needs to be downloaded again. The marker files are deleted when they become older than the oldest file on the |
| 58 | + server. |
| 59 | + """ |
| 60 | + def touch(fname, times=None): |
| 61 | + with open(fname, 'a'): |
| 62 | + os.utime(fname, times) |
| 63 | + |
| 64 | + # Figure out the files and their update times from the MADIS server |
| 65 | + txt = urlreq.urlopen(_base_url).read() |
| 66 | + files = re.findall(">([\d]{8}_[\d]{4}).gz<", txt) |
| 67 | + update_times = re.findall("([\d]{2}-[\w]{3}-[\d]{4} [\d]{2}:[\d]{2})", txt) |
| 68 | + |
| 69 | + # If a MADIS server file is newer than the corresponding marker file, add it to the list of times to download |
| 70 | + times_to_dl = [] |
| 71 | + for file_time_str, update_time_str in zip(files, update_times): |
| 72 | + marker_fname = "%s/%s.txt" % (marker_path, file_time_str) |
| 73 | + |
| 74 | + update_time = datetime.strptime(update_time_str, '%d-%b-%Y %H:%M') |
| 75 | + file_time = datetime.strptime(file_time_str, '%Y%m%d_%H%M') |
| 76 | + |
| 77 | + if not os.path.exists(marker_fname) or (os.path.exists(marker_fname) |
| 78 | + and _epoch + timedelta(seconds=os.path.getmtime(marker_fname)) < update_time): |
| 79 | + times_to_dl.append(file_time) |
| 80 | + touch(marker_fname) |
| 81 | + |
| 82 | + # Check for old marker files and delete them if they're older than the oldest file on the MADIS server |
| 83 | + earliest_time = datetime.strptime(files[0], '%Y%m%d_%H%M') |
| 84 | + for fname in glob.glob("%s/*.txt" % marker_path): |
| 85 | + ftime = datetime.strptime(os.path.basename(fname), "%Y%m%d_%H%M.txt") |
| 86 | + if ftime < earliest_time: |
| 87 | + os.unlink(fname) |
| 88 | + |
| 89 | + return times_to_dl |
| 90 | + |
| 91 | + |
| 92 | +def dl_profiles(dt, fname): |
| 93 | + """ |
| 94 | + dl_profiles |
| 95 | +
|
| 96 | + Download netCDF files from the MADIS server. They're actually gzipped on the remote server, so unzip them in memory |
| 97 | + and write plain netCDF. |
| 98 | + """ |
| 99 | + url = "%s/%s.gz" % (_base_url, dt.strftime('%Y%m%d_%H%M')) |
| 100 | + |
| 101 | + # Download the file and put the contents in a memory buffer to unzip |
| 102 | + sio = StringIO(urlreq.urlopen(url).read()) |
| 103 | + gzf = gzip.GzipFile(fileobj=sio) |
| 104 | + |
| 105 | + # Write the unzipped data |
| 106 | + with open(fname, 'wb') as fnc: |
| 107 | + fnc.write(gzf.read()) |
| 108 | + |
| 109 | + |
| 110 | +def load_profiles(fname): |
| 111 | + """ |
| 112 | + load_profiles |
| 113 | +
|
| 114 | + Load netCDF files and put the data into data structures. The data for each variable are in one array containing |
| 115 | + every ob for every profile, so we have to split the arrays up into distinct profiles ourselves. We also enforce the |
| 116 | + time granularity on the profile valid times (not the obs within a single profile) here. |
| 117 | + """ |
| 118 | + stdatm = std_atmosphere_pres() |
| 119 | + |
| 120 | + load_vars = ['temperature', 'dewpoint', 'soundingSecs', 'sounding_airport_id', 'latitude', 'longitude', |
| 121 | + 'windSpeed', 'windDir', 'altitude'] |
| 122 | + |
| 123 | + nc = Dataset(fname) |
| 124 | + profile_data = {var: nc.variables[var][:] for var in load_vars} |
| 125 | + nc.close() |
| 126 | + |
| 127 | + # Split the profile arrays wherever the valid time changes. I guess this will mess up if two adjacent profiles |
| 128 | + # happen to be valid at the same time, but I'm not sure that throws out too many good profiles. |
| 129 | + splits = np.where(np.diff(profile_data['soundingSecs']))[0] + 1 |
| 130 | + |
| 131 | + profile_data_split = {var: np.split(prof_var, splits) for var, prof_var in profile_data.items()} |
| 132 | + profiles = [{} for prof in profile_data_split['soundingSecs']] |
| 133 | + for var, prof_var in profile_data_split.items(): |
| 134 | + for prof, split_prof in zip(profiles, prof_var): |
| 135 | + prof[var] = split_prof |
| 136 | + |
| 137 | + for prof in profiles: |
| 138 | + # The sensed altitude variable is pressure. The heights are computed from pressure using a standard atmosphere, |
| 139 | + # but then they don't give pressure for some reason, so we need to get the pressures back from heights |
| 140 | + prof['pressure'] = stdatm(prof['altitude']) |
| 141 | + |
| 142 | + # Enforce the granularity on the profile valid times. The granularity is set by _time_granularity above. |
| 143 | + unique_profiles = {} |
| 144 | + for profile in profiles: |
| 145 | + ap_code = profile['sounding_airport_id'][0] |
| 146 | + snd_time = profile['soundingSecs'][0] |
| 147 | + |
| 148 | + if type(ap_code) == type(np.ma.masked) or type(snd_time) == type(np.ma.masked): |
| 149 | + continue |
| 150 | + |
| 151 | + snd_time = _time_granularity * np.round(snd_time / _time_granularity) |
| 152 | + key = (snd_time, ap_code) |
| 153 | + if key not in unique_profiles or len(unique_profiles[key]['soundingSecs']) < len(profile['soundingSecs']): |
| 154 | + unique_profiles[key] = profile |
| 155 | + |
| 156 | + profiles = list(unique_profiles.values()) |
| 157 | + return profiles |
| 158 | + |
| 159 | + |
| 160 | +def output_profile(path, profile, meta_airport): |
| 161 | + """ |
| 162 | + output_profile |
| 163 | +
|
| 164 | + Write a single profile out to a file on disk in the SPC text format. Several quality control items are done in this |
| 165 | + method. |
| 166 | + 1) If the airport code is missing, ignore the profile. |
| 167 | + 2) If the profile's airport code doesn't exist in our database, ignore the profile. |
| 168 | + 3) If the surface height is missing, ignore the profile. (This gives SHARPpy problems.) |
| 169 | + 4) If the profile's lat/lon data are too far away from the airport it claims to be from, ignore the profile. |
| 170 | + 5) If the profile has fewer than 3 data points, ignore the profile. |
| 171 | + 6) If there's already a file with the same name on disk and this profile would produce a smaller file, ignore the |
| 172 | + profile. |
| 173 | + 7) Remove obs that create duplicate height values or out-of-order pressure values (which also give SHARPpy issues). |
| 174 | + """ |
| 175 | + |
| 176 | + # Check for a missing airport code |
| 177 | + code = profile['sounding_airport_id'][0] |
| 178 | + if type(code) == type(np.ma.masked): |
| 179 | + return |
| 180 | + |
| 181 | + # Check for an airport code that's not in the database |
| 182 | + try: |
| 183 | + apid = meta_airport[code]['id'] |
| 184 | + except KeyError: |
| 185 | + print("Skipping profile: unknown airport code '%d'" % code) |
| 186 | + return |
| 187 | + |
| 188 | + # Check for a missing surface height |
| 189 | + if type(profile['altitude'][0]) == type(np.ma.masked): |
| 190 | + print("Skipping profile: surface height at '%s' is qc'ed" % apid) |
| 191 | + return |
| 192 | + |
| 193 | + # Check for distance from the claimed source airport |
| 194 | + ap_lat = meta_airport[code]['lat'] |
| 195 | + ap_lon = meta_airport[code]['lon'] |
| 196 | + |
| 197 | + dist = np.hypot(ap_lat - profile['latitude'], ap_lon - profile['longitude']) |
| 198 | + if dist.min() > 1: |
| 199 | + print("Skipping profile: claims to be from '%s', but data are too far away" % apid) |
| 200 | + return |
| 201 | + |
| 202 | + # Get a datetime object with the profile time |
| 203 | + dt = _epoch + timedelta(seconds=float(profile['soundingSecs'][0])) |
| 204 | + |
| 205 | + # Fill any qc'ed values with the missing value |
| 206 | + pres_prof = profile['pressure'].filled(_missing) |
| 207 | + hght_prof = profile['altitude'].filled(_missing) |
| 208 | + tmpk_prof = profile['temperature'].filled(_missing) |
| 209 | + dwpk_prof = profile['dewpoint'].filled(_missing) |
| 210 | + wdir_prof = profile['windDir'].filled(_missing) |
| 211 | + wspd_prof = profile['windSpeed'].filled(_missing) |
| 212 | + |
| 213 | + # Sort by height |
| 214 | + sort_idxs = np.argsort(hght_prof) |
| 215 | + pres_prof = pres_prof[sort_idxs] |
| 216 | + hght_prof = hght_prof[sort_idxs] |
| 217 | + tmpk_prof = tmpk_prof[sort_idxs] |
| 218 | + dwpk_prof = dwpk_prof[sort_idxs] |
| 219 | + wdir_prof = wdir_prof[sort_idxs] |
| 220 | + wspd_prof = wspd_prof[sort_idxs] |
| 221 | + |
| 222 | + # Remove duplicate heights or out-of-order pressures |
| 223 | + bad_hghts = np.append(False, np.isclose(np.diff(hght_prof), 0)) |
| 224 | + bad_press = np.append(False, np.diff(pres_prof) >= 0) |
| 225 | + |
| 226 | + keep = np.where(~(bad_hghts | bad_press)) |
| 227 | + pres_prof = pres_prof[keep] |
| 228 | + hght_prof = hght_prof[keep] |
| 229 | + tmpk_prof = tmpk_prof[keep] |
| 230 | + dwpk_prof = dwpk_prof[keep] |
| 231 | + wdir_prof = wdir_prof[keep] |
| 232 | + wspd_prof = wspd_prof[keep] |
| 233 | + |
| 234 | + # Check for number of data points |
| 235 | + if len(hght_prof) < 3: |
| 236 | + return |
| 237 | + |
| 238 | + # Create the text output |
| 239 | + snd_lines = [ |
| 240 | + "%TITLE%", |
| 241 | + "%s %s" % (apid.rjust(5), dt.strftime('%y%m%d/%H%M')), |
| 242 | + "", |
| 243 | + " LEVEL HGHT TEMP DWPT WDIR WSPD", |
| 244 | + "-------------------------------------------------------------------", |
| 245 | + "%RAW%" |
| 246 | + ] |
| 247 | + |
| 248 | + for pres, hght, tmpk, dwpk, wdir, wspd in zip(pres_prof, hght_prof, tmpk_prof, dwpk_prof, wdir_prof, wspd_prof): |
| 249 | + ob_str = "% 8.2f, % 9.2f, % 9.2f, % 9.2f, % 9.2f, % 9.2f" % (pres / 100., hght, tmpk - 273.15, dwpk - 273.15, |
| 250 | + wdir, wspd * 1.94) |
| 251 | + snd_lines.append(ob_str) |
| 252 | + snd_lines.append('%END%') |
| 253 | + |
| 254 | + # Construct the file name (using the time granularity) |
| 255 | + dt_sec = round((dt - _epoch).total_seconds() / _time_granularity) * _time_granularity |
| 256 | + dt_round = _epoch + timedelta(seconds=dt_sec) |
| 257 | + tree_path = "%s/%s" % (path, dt_round.strftime("%Y/%m/%d/%H")) |
| 258 | + fname = "%s/%s_%s.txt" % (tree_path, apid, dt_round.strftime("%H%M")) |
| 259 | + |
| 260 | + try: |
| 261 | + os.makedirs(tree_path) |
| 262 | + except OSError: |
| 263 | + pass |
| 264 | + |
| 265 | + snd_str = "\n".join(snd_lines) |
| 266 | + exist_size = os.path.getsize(fname) if os.path.exists(fname) else 0 |
| 267 | + |
| 268 | + # Check for a smaller file that already exists. The idea is to avoid writing over a "good" file with a "bad" file, |
| 269 | + # where file size is used as a proxy for "goodness". This may not be the best proxy, though. |
| 270 | + if len(snd_str) < exist_size: |
| 271 | + print("Skipping profile: refusing to overwrite existing '%s' with a smaller file" % fname) |
| 272 | + else: |
| 273 | + with open(fname, 'w') as fsnd: |
| 274 | + fsnd.write(snd_str) |
| 275 | + |
| 276 | + |
| 277 | +def output_profiles(path, profiles, meta): |
| 278 | + """ |
| 279 | + output_profiles |
| 280 | +
|
| 281 | + Loop and dump out every profile. Not entirely sure this needs to be a separate function, but whatever. |
| 282 | + """ |
| 283 | + for profile in profiles: |
| 284 | + output_profile(path, profile, meta) |
| 285 | + |
| 286 | + |
| 287 | +def main(): |
| 288 | + meta = load_meta() |
| 289 | + |
| 290 | + print("Retrieving profile times ...") |
| 291 | + dts = get_times() |
| 292 | + |
| 293 | + for dt in dts: |
| 294 | + print("New profiles for %s" % dt.strftime("%H%M UTC %d %b")) |
| 295 | + |
| 296 | + fname = "%s/%s.nc" % (_work_path, dt.strftime("%Y%m%d_%H%M")) |
| 297 | + |
| 298 | + print("Downloading profiles ...") |
| 299 | + dl_profiles(dt, fname) |
| 300 | + |
| 301 | + print("Parsing profiles ...") |
| 302 | + profiles = load_profiles(fname) |
| 303 | + |
| 304 | + print("Dumping files ...") |
| 305 | + output_profiles(_output_path, profiles, meta) |
| 306 | + |
| 307 | + os.unlink(fname) |
| 308 | + |
| 309 | + print("Done!") |
| 310 | + |
| 311 | +if __name__ == "__main__": |
| 312 | + main() |
0 commit comments