From da2421111d06511d178ba3396a3dd6d77a18f428 Mon Sep 17 00:00:00 2001 From: Carson Pruitt <90792257+CarsonPruitt-NOAA@users.noreply.github.com> Date: Mon, 15 Nov 2021 20:21:38 -0600 Subject: [PATCH] Adds full resolution and mainstem inundation composite capability This resolves #476. See Changelog 3.0.24.0 for full details. --- docs/CHANGELOG.md | 13 + tools/composite_ms_fr_inundation.py | 207 +++++++++ tools/gms_tools/inundate_gms.py | 261 +++++++++++ tools/gms_tools/mosaic_inundation.py | 145 +++++++ tools/gms_tools/overlapping_inundation.py | 505 ++++++++++++++++++++++ tools/inundation.py | 10 +- tools/run_test_case.py | 13 +- tools/run_test_case_gms.py | 376 ++++++++++++++++ tools/tools_shared_variables.py | 2 + 9 files changed, 1528 insertions(+), 4 deletions(-) create mode 100644 tools/composite_ms_fr_inundation.py create mode 100644 tools/gms_tools/inundate_gms.py create mode 100644 tools/gms_tools/mosaic_inundation.py create mode 100644 tools/gms_tools/overlapping_inundation.py create mode 100644 tools/run_test_case_gms.py diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f3a7cb036..8a0de7039 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,6 +1,19 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format. +## v3.0.24.0 - 2021-11-08 - [PR #482](https://github.com/NOAA-OWP/cahaba/pull/482) + +Adds `composite_ms_fr_inundation.py` to allow for the generation of an inundation map given a "flow file" CSV and full-resolution (FR) and mainstem (MS) relative elevation models, synthetic rating curves, and catchments rasters created by the `fim_run.sh` script. + +## Additions +- `composite_ms_fr_inundation.py`: New module that is used to inundate both MS and FR FIM and composite the two inundation rasters. +- `/tools/gms_tools/`: Three modules (`inundate_gms.py`, `mosaic_inundation.py`, `overlapping_inundation.py`) ported from the GMS branch used to composite inundation rasters. + +## Changes +- `inundation.py`: Added 2 exception classes ported from the GMS branch. + +

+ ## v3.0.23.3 - 2021-11-04 - [PR #481](https://github.com/NOAA-OWP/cahaba/pull/481) Includes additional hydraulic properties to the `hydroTable.csv`: `Number of Cells`, `SurfaceArea (m2)`, `BedArea (m2)`, `Volume (m3)`, `SLOPE`, `LENGTHKM`, `AREASQKM`, `Roughness`, `TopWidth (m)`, `WettedPerimeter (m)`. Also adds `demDerived_reaches_split_points.gpkg`, `flowdir_d8_burned_filled.tif`, and `dem_thalwegCond.tif` to `-v` whitelist. diff --git a/tools/composite_ms_fr_inundation.py b/tools/composite_ms_fr_inundation.py new file mode 100644 index 000000000..1b975a8b9 --- /dev/null +++ b/tools/composite_ms_fr_inundation.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +import os, argparse, rasterio +import numpy as np +import pandas as pd +from multiprocessing import Pool + +from inundation import inundate +from gms_tools.mosaic_inundation import Mosaic_inundation, __append_id_to_file_name + + +def composite_inundation(fim_dir_ms, fim_dir_fr, huc, flows, composite_output_dir, ouput_name='', + bin_rast_flag=False, depth_rast_flag=False, clean=True, quiet=True): + """ + Run `inundate()` on FIM 3.X mainstem (MS) and full-resolution (FR) outputs and composite results. Assumes that all `fim_run` products + necessary for `inundate()` are in each huc8 folder. + + Parameters + ---------- + fim_dir_ms : str + Path to MS FIM directory. This should be an output directory from `fim_run.sh`. + fim_dir_fr : str + Path to FR FIM directory. This should be an output directory from `fim_run.sh`. + huc : str + HUC8 to run `inundate()`. This should be a folder within both `fim_dir_ms` and `fim_dir_fr`. + flows : str or pandas.DataFrame, can be a single file or a comma-separated list of files + File path to forecast csv or Pandas DataFrame with correct column names. + composite_output_dir : str + Folder path to write outputs. It will be created if it does not exist. + ouput_name : str, optional + Name for output raster. If not specified, by default the raster will be named 'inundation_composite_{flows_root}.tif'. + bin_rast_flag : bool, optional + Flag to create binary raster as output. If no raster flags are passed, this is the default behavior. + depth_rast_flag : bool, optional + Flag to create depth raster as output. + clean : bool, optional + If True, intermediate files are deleted. + quiet : bool, optional + Quiet output. + + Returns + ------- + None + + Raises + ------ + TypeError + Wrong input data types + AssertionError + Wrong input data types + + Notes + ----- + - Specifying a subset of the domain in rem or catchments to inundate on is achieved by the HUCs file or the forecast file. + + Examples + -------- + >>> import composite_ms_fr_inundation + >>> composite_ms_fr_inundation.composite_inundation( + '/home/user/fim_ouput_mainstem', + '/home/user/fim_ouput_fullres', + '12090301', + '/home/user/forecast_file.csv', + '/home/user/fim_inundation_composite', + True, + False) + """ + # Set inundation raster to True if no output type flags are passed + if not (bin_rast_flag or depth_rast_flag): + bin_rast_flag = True + assert not (bin_rast_flag and depth_rast_flag), 'Output can only be binary or depth grid, not both' + assert os.path.isdir(fim_dir_ms), f'{fim_dir_ms} is not a directory. Please specify an existing MS FIM directory.' + assert os.path.isdir(fim_dir_fr), f'{fim_dir_fr} is not a directory. Please specify an existing FR FIM directory.' + assert os.path.exists(flows), f'{flows} does not exist. Please specify a flow file.' + + # Instantiate output variables + var_keeper = { + 'ms': { + 'dir': fim_dir_ms, + 'outputs': { + 'inundation_rast': os.path.join(composite_output_dir, f'inundation_ms_{huc}.tif') if bin_rast_flag else None, + 'depth_rast': os.path.join(composite_output_dir, f'depth_ms_{huc}.tif') if depth_rast_flag else None + } + }, + 'fr': { + 'dir': fim_dir_fr, + 'outputs': { + 'inundation_rast': os.path.join(composite_output_dir, f'inundation_fr_{huc}.tif') if bin_rast_flag else None, + 'depth_rast': os.path.join(composite_output_dir, f'depth_fr_{huc}.tif') if depth_rast_flag else None + } + } + } + # Build inputs to inundate() based on the input folders and huc + if not quiet: print(f"HUC {huc}") + for extent in var_keeper: + rem = os.path.join(var_keeper[extent]['dir'], huc, 'rem_zeroed_masked.tif') + catchments = os.path.join(var_keeper[extent]['dir'], huc, 'gw_catchments_reaches_filtered_addedAttributes.tif') + catchment_poly = os.path.join(var_keeper[extent]['dir'], huc, 'gw_catchments_reaches_filtered_addedAttributes_crosswalked.gpkg') + hydro_table = os.path.join(var_keeper[extent]['dir'], huc, 'hydroTable.csv') + + # Ensure that all of the required files exist in the huc directory + for file in (rem, catchments, catchment_poly, hydro_table): + if not os.path.exists(file): + raise Exception(f"The following file does not exist within the supplied FIM directory:\n{file}") + + # Run inundation() + extent_friendly = "mainstem (MS)" if extent=="ms" else "full-resolution (FR)" + if not quiet: print(f" Creating an inundation map for the {extent_friendly} configuration...") + result = inundate(rem,catchments,catchment_poly,hydro_table,flows,mask_type=None, + inundation_raster= var_keeper[extent]['outputs']['inundation_rast'], + depths= var_keeper[extent]['outputs']['depth_rast'], + quiet= quiet) + if result != 0: + raise Exception(f"Failed to inundate {rem} using the provided flows.") + + # If no output name supplied, create one using the flows file name + if not ouput_name: + flows_root = os.path.splitext(os.path.basename(flows))[0] + ouput_name = os.path.join(composite_output_dir, f'inundation_composite_{flows_root}.tif') + else: + ouput_name = os.path.join(composite_output_dir, ouput_name) + + # Composite MS and FR + inundation_map_file = { + 'huc8' : [huc] * 2, + 'branchID' : [None] * 2, + 'inundation_rasters': [var_keeper['fr']['outputs']['inundation_rast'], + var_keeper['ms']['outputs']['inundation_rast']], + 'depths_rasters': [var_keeper['fr']['outputs']['depth_rast'], + var_keeper['ms']['outputs']['depth_rast']] + } + inundation_map_file = pd.DataFrame(inundation_map_file) + Mosaic_inundation( + inundation_map_file, + mosaic_attribute='depths_rasters' if depth_rast_flag else 'inundation_rasters', + mosaic_output=ouput_name, + mask=catchment_poly, + unit_attribute_name='huc8', + nodata=-9999, + workers=1, + remove_inputs=clean, + subset=None,verbose=not quiet + ) + if bin_rast_flag: + hydroid_to_binary(__append_id_to_file_name(ouput_name, huc)) + +def hydroid_to_binary(hydroid_raster_filename): + '''Converts hydroid positive/negative grid to 1/0''' + + #to_bin = lambda x: np.where(x > 0, 1, np.where(x == 0, -9999, 0)) + to_bin = lambda x: np.where(x > 0, 1, np.where(x != -9999, 0, -9999)) + hydroid_raster = rasterio.open(hydroid_raster_filename) + profile = hydroid_raster.profile # get profile for new raster creation later on + profile['nodata'] = -9999 + bin_raster = to_bin(hydroid_raster.read(1)) # converts neg/pos to 0/1 + # Overwrite inundation raster + with rasterio.open(hydroid_raster_filename, "w", **profile) as out_raster: + out_raster.write(bin_raster.astype(hydroid_raster.profile['dtype']), 1) + del hydroid_raster,profile,bin_raster + + +if __name__ == '__main__': + + # parse arguments + parser = argparse.ArgumentParser(description='Inundate FIM 3 full resolution and mainstem outputs using a flow file and composite the results.') + parser.add_argument('-ms','--fim-dir-ms',help='Directory that contains MS FIM outputs.',required=True) + parser.add_argument('-fr','--fim-dir-fr',help='Directory that contains FR FIM outputs.',required=True) + parser.add_argument('-u','--huc',help='HUC within FIM directories to inunundate. Can be a comma-separated list.',required=True) + parser.add_argument('-f','--flows-file',help='File path of flows csv or comma-separated list of paths if running multiple HUCs',required=True) + parser.add_argument('-o','--ouput-dir',help='Folder to write Composite Raster output.',required=True) + parser.add_argument('-n','--ouput-name',help='File name for output(s).',default=None,required=False) + parser.add_argument('-b','--bin-raster',help='Output raster is a binary wet/dry grid. This is the default if no raster flags are passed.',required=False,default=False,action='store_true') + parser.add_argument('-d','--depth-raster',help='Output raster is a depth grid.',required=False,default=False,action='store_true') + parser.add_argument('-j','--num-workers',help='Number of concurrent processesto run.',required=False,default=1,type=int) + parser.add_argument('-c','--clean',help='If flag used, intermediate rasters are NOT cleaned up.',required=False,default=True,action='store_false') + parser.add_argument('-q','--quiet',help='Quiet terminal output.',required=False,default=False,action='store_true') + + # Extract to dictionary and assign to variables. + args = vars(parser.parse_args()) + fim_dir_ms = args['fim_dir_ms'] + fim_dir_fr = args['fim_dir_fr'] + hucs = args['huc'].replace(' ', '').split(',') + flows_files = args['flows_file'].replace(' ', '').split(',') + num_workers = int(args['num_workers']) + output_dir = args['ouput_dir'] + ouput_name = args['ouput_name'] + bin_raster = bool(args['bin_raster']) + depth_raster = bool(args['depth_raster']) + clean = bool(args['clean']) + quiet = bool(args['quiet']) + + assert num_workers >= 1, "Number of workers should be 1 or greater" + assert len(flows_files) == len(hucs), "Number of hucs must be equal to the number of forecasts provided" + assert not (bin_raster and depth_raster), "Cannot use both -b and -d flags" + + # Create output directory if it does not exist + if not os.path.isdir(output_dir): + os.mkdir(output_dir) + + # Create nested list for input into multi-threading + arg_list = [] + for huc, flows_file in zip(hucs, flows_files): + arg_list.append((fim_dir_ms, fim_dir_fr, huc, flows_file, output_dir, ouput_name, bin_raster, depth_raster, clean, quiet)) + + # Multi-thread for each huc in input hucs + with Pool(processes=num_workers) as pool: + # Run composite_inundation() + pool.starmap(composite_inundation, arg_list) diff --git a/tools/gms_tools/inundate_gms.py b/tools/gms_tools/inundate_gms.py new file mode 100644 index 000000000..0148da336 --- /dev/null +++ b/tools/gms_tools/inundate_gms.py @@ -0,0 +1,261 @@ +#!/usr/bin/env python3 + +import numpy as np +import pandas as pd +from inundation import inundate +import os +from tqdm import tqdm +import argparse +from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor,as_completed +from inundation import hydroTableHasOnlyLakes, NoForecastFound +import traceback +import logging + + +def Inundate_gms( + hydrofabric_dir, forecast, num_workers=1, + hucs=None, + inundation_raster=None, + inundation_polygon=None, depths_raster=None, + verbose=False, + log_file=None, + output_fileNames=None + ): + + # input handling + if hucs is not None: + try: + _ = (i for i in hucs) + except TypeError: + raise ValueError("hucs argument must be an iterable") + + if isinstance(hucs,str): + hucs = [hucs] + + num_workers = int(num_workers) + + # log file + if log_file is not None: + if os.path.exists(log_file): + os.remove(log_file) + + print('HUC8,BranchID,Exception',file=open(log_file,'w')) + #if log_file: + #logging.basicConfig(filename=log_file, level=logging.INFO) + #logging.info('HUC8,BranchID,Exception') + + # load gms inputs + hucs_branches = pd.read_csv( os.path.join(hydrofabric_dir,'gms_inputs.csv'), + header=None, + dtype= {0:str,1:str} + ) + + if hucs is not None: + hucs = set(hucs) + huc_indices = hucs_branches.loc[:,0].isin(hucs) + hucs_branches = hucs_branches.loc[huc_indices,:] + + # get number of branches + number_of_branches = len(hucs_branches) + + # make inundate generator + inundate_input_generator = __inundate_gms_generator( + hucs_branches,number_of_branches, + hydrofabric_dir, + inundation_raster, + inundation_polygon, + depths_raster, + forecast, + verbose=False + ) + + # start up process pool + # better results with Process pool + executor = ProcessPoolExecutor(max_workers=num_workers) + + # collect output filenames + inundation_raster_fileNames = [None] * number_of_branches + inundation_polygon_fileNames = [None] * number_of_branches + depths_raster_fileNames = [None] * number_of_branches + hucCodes = [None] * number_of_branches + branch_ids = [None] * number_of_branches + + + executor_generator = { + executor.submit(inundate,**inp) : ids for inp,ids in inundate_input_generator + } + + idx = 0 + for future in tqdm(as_completed(executor_generator), + total=len(executor_generator), + disable=(not verbose), + desc="Inundating branches with {} workers".format(num_workers) + ): + + hucCode, branch_id = executor_generator[future] + + try: + future.result() + + except NoForecastFound as exc: + if log_file is not None: + print(f'{hucCode},{branch_id},{exc.__class__.__name__}, {exc}', + file=open(log_file,'a')) + elif verbose: + print(f'{hucCode},{branch_id},{exc.__class__.__name__}, {exc}') + + except hydroTableHasOnlyLakes as exc: + if log_file is not None: + print(f'{hucCode},{branch_id},{exc.__class__.__name__}, {exc}', + file=open(log_file,'a')) + elif verbose: + print(f'{hucCode},{branch_id},{exc.__class__.__name__}, {exc}') + + except Exception as exc: + if log_file is not None: + print(f'{hucCode},{branch_id},{exc.__class__.__name__}, {exc}', + file=open(log_file,'a')) + else: + print(f'{hucCode},{branch_id},{exc.__class__.__name__}, {exc}') + else: + + hucCodes[idx] = hucCode + branch_ids[idx] = branch_id + + try: + #print(hucCode,branch_id,future.result()[0][0]) + inundation_raster_fileNames[idx] = future.result()[0][0] + except TypeError: + pass + + try: + depths_raster_fileNames[idx] = future.result()[1][0] + except TypeError: + pass + + try: + inundation_polygon_fileNames[idx] = future.result()[2][0] + except TypeError: + pass + + idx += 1 + + # power down pool + executor.shutdown(wait=True) + + # make filename dataframe + output_fileNames_df = pd.DataFrame( { + 'huc8' : hucCodes, + 'branchID' : branch_ids, + 'inundation_rasters' : inundation_raster_fileNames, + 'depths_rasters' : depths_raster_fileNames, + 'inundation_polygons' : inundation_polygon_fileNames } + ) + + if output_fileNames is not None: + output_fileNames_df.to_csv(output_fileNames,index=False) + + return(output_fileNames_df) + + + + +def __inundate_gms_generator( + hucs_branches,number_of_branches, + hydrofabric_dir, + inundation_raster, + inundation_polygon, + depths_raster, + forecast,verbose=False + ): + + # iterate over branches + for idx,row in hucs_branches.iterrows(): + + huc = str(row[0]) + branch_id = str(row[1]) + + gms_dir = os.path.join(hydrofabric_dir,huc,'branches') + + rem_branch = os.path.join( gms_dir,branch_id,'rem_zeroed_masked_{}.tif'.format(branch_id) ) + catchments_branch = os.path.join( gms_dir,branch_id, + f'gw_catchments_reaches_filtered_addedAttributes_{branch_id}.tif' ) + hydroTable_branch = os.path.join( gms_dir,branch_id,'hydroTable_{}.csv'.format(branch_id) ) + catchment_poly = os.path.join( gms_dir, branch_id, + f'gw_catchments_reaches_filtered_addedAttributes_crosswalked_{branch_id}.gpkg' ) + + + # branch output + inundation_branch_raster = __append_id_to_file_name(inundation_raster,[huc,branch_id]) + inundation_branch_polygon = __append_id_to_file_name(inundation_polygon,[huc,branch_id]) + depths_branch_raster = __append_id_to_file_name(depths_raster,[huc,branch_id]) + + # identifiers + identifiers = (huc,branch_id) + + # inundate input + inundate_input = { + 'rem' : rem_branch, 'catchments' : catchments_branch, 'catchment_poly' : catchment_poly, + 'hydro_table' : hydroTable_branch,'forecast' : forecast, + 'mask_type' : None, + 'hucs' : None, + 'hucs_layerName' : None, + 'subset_hucs' : None, 'num_workers' : 1, + 'aggregate' : False, + 'inundation_raster' : inundation_branch_raster, + 'inundation_polygon' : inundation_branch_polygon, + 'depths' : depths_branch_raster, + 'out_raster_profile' : None, + 'out_vector_profile' : None, + 'quiet' : not verbose + } + + yield (inundate_input,identifiers) + + + +def __append_id_to_file_name(file_name,identifier): + + + if file_name is not None: + + root,extension = os.path.splitext(file_name) + + if isinstance(identifier,list): + for i in identifier: + out_file_name = root + "_{}".format(i) + out_file_name += extension + else: + out_file_name = root + "_{}".format(identifier) + extension + + else: + out_file_name = None + + return(out_file_name) + + +def __vprint(message,verbose): + if verbose: + print(message) + + +if __name__ == '__main__': + + # parse arguments + parser = argparse.ArgumentParser(description='Inundate GMS') + parser.add_argument('-y','--hydrofabric_dir', help='Directory path to FIM hydrofabric by processing unit', required=True) + parser.add_argument('-u','--hucs',help='List of HUCS to run',required=False,default=None,type=str,nargs='+') + parser.add_argument('-f','--forecast',help='Forecast discharges in CMS as CSV file',required=True) + parser.add_argument('-i','--inundation-raster',help='Inundation Raster output. Only writes if designated.',required=False,default=None) + parser.add_argument('-p','--inundation-polygon',help='Inundation polygon output. Only writes if designated.',required=False,default=None) + parser.add_argument('-d','--depths-raster',help='Depths raster output. Only writes if designated. Appends HUC code in batch mode.',required=False,default=None) + parser.add_argument('-l','--log-file',help='Log-file to store level-path exceptions',required=False,default=None) + parser.add_argument('-o','--output-fileNames',help='Output CSV file with filenames for inundation rasters, inundation polygons, and depth rasters',required=False,default=None) + parser.add_argument('-w','--num-workers', help='Number of Workers', required=False,default=1) + parser.add_argument('-v','--verbose',help='Verbose printing',required=False,default=None,action='store_true') + + + # extract to dictionary and run + Inundate_gms( **vars(parser.parse_args()) ) + + \ No newline at end of file diff --git a/tools/gms_tools/mosaic_inundation.py b/tools/gms_tools/mosaic_inundation.py new file mode 100644 index 000000000..ea24cb75f --- /dev/null +++ b/tools/gms_tools/mosaic_inundation.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python +# coding: utf-8 + +from glob import glob +from gms_tools.overlapping_inundation import OverlapWindowMerge +import argparse +import os +import pandas as pd +from tqdm import tqdm +from tools_shared_variables import elev_raster_ndv + +def Mosaic_inundation( + map_file,mosaic_attribute='inundation_rasters',mosaic_output=None, + mask=None,unit_attribute_name='huc8', + nodata=elev_raster_ndv,workers=4, + remove_inputs=False, + subset=None,verbose=True + ): + + # check input + if mosaic_attribute not in ('inundation_rasters','depths_rasters'): + raise ValueError('Pass inundation or depths for mosaic_attribute argument') + + # load file + if isinstance(map_file,pd.DataFrame): + inundation_maps_df = map_file + del map_file + elif isinstance(map_file,str): + inundation_maps_df = pd.read_csv(map_file, + dtype={unit_attribute_name:str,'branchID':str} + ) + else: + raise TypeError('Pass Pandas Dataframe or file path string to csv for map_file argument') + + # remove NaNs + inundation_maps_df.dropna(axis=0,how='all',inplace=True) + + # subset + if subset is not None: + subset_mask = inundation_maps_df.loc[:,unit_attribute_name].isin(subset) + inundation_maps_df = inundation_maps_df.loc[subset_mask,:] + + # unique aggregation units + aggregation_units = inundation_maps_df.loc[:,unit_attribute_name].unique() + + inundation_maps_df.set_index(unit_attribute_name,drop=True,inplace=True) + + # decide upon wheter to display + if verbose & len(aggregation_units) == 1: + tqdm_disable = False + elif verbose: + tqdm_disable = False + else: + tqdm_disable = True + + for ag in tqdm(aggregation_units,disable=tqdm_disable,desc='Compositing MS and FR maps'): + + try: + inundation_maps_list = inundation_maps_df.loc[ag,mosaic_attribute].tolist() + except AttributeError: + inundation_maps_list = [ inundation_maps_df.loc[ag,mosaic_attribute] ] + + ag_mosaic_output = __append_id_to_file_name(mosaic_output,ag) + #try: + mosaic_by_unit(inundation_maps_list,ag_mosaic_output,nodata, + workers=1,remove_inputs=remove_inputs,mask=mask,verbose=verbose) + #except Exception as exc: + # print(ag,exc) + + + # inundation maps + inundation_maps_df.reset_index(drop=True) + + + +def mosaic_by_unit(inundation_maps_list,mosaic_output,nodata=elev_raster_ndv, + workers=1,remove_inputs=False,mask=None,verbose=False): + + + # overlap object instance + overlap = OverlapWindowMerge( inundation_maps_list, (30, 30) ) + + # mosaic + #if verbose: + # print("Mosaicing ...") + + if mosaic_output is not None: + if workers > 1: + threaded = True + else: + threaded= False + + overlap.merge_rasters(mosaic_output, threaded=threaded, workers=workers,nodata=nodata) + + if mask: + #if verbose: + # print("Masking ...") + overlap.mask_mosaic(mosaic_output,mask,outfile=mosaic_output) + + if remove_inputs: + #if verbose: + # print("Removing inputs ...") + + for inun_map in inundation_maps_list: + if inun_map is not None: + if os.path.isfile(inun_map): + os.remove(inun_map) + + +def __append_id_to_file_name(file_name,identifier): + + + if file_name is not None: + + root,extension = os.path.splitext(file_name) + + if isinstance(identifier,list): + for i in identifier: + out_file_name = root + "_{}".format(i) + out_file_name += extension + else: + out_file_name = root + "_{}".format(identifier) + extension + + else: + out_file_name = None + + return(out_file_name) + + +if __name__ == '__main__': + + parser = argparse.ArgumentParser(description='Mosaic GMS Inundation Rasters') + parser.add_argument('-i','--map-file', help='List of file paths to inundation/depth maps to mosaic', required=True) + parser.add_argument('-a','--mask', help='File path to vector polygon mask to clip mosaic too', required=False,default=None) + parser.add_argument('-s','--subset', help='Subset units', required=False,default=None,type=str,nargs='+') + parser.add_argument('-n','--nodata', help='Inundation Maps', required=False,default=elev_raster_ndv) + parser.add_argument('-w','--workers', help='Number of Workers', required=False,default=4,type=int) + parser.add_argument('-t','--mosaic-attribute', help='Mosaiced inundation Maps', required=False,default=None) + parser.add_argument('-m','--mosaic-output', help='Mosaiced inundation Maps', required=False,default=None) + parser.add_argument('-r','--remove-inputs', help='Remove original input inundation Maps', required=False,default=False,action='store_true') + parser.add_argument('-v','--verbose', help='Remove original input inundation Maps', required=False,default=False,action='store_true') + + args = vars(parser.parse_args()) + + Mosaic_inundation(**args) \ No newline at end of file diff --git a/tools/gms_tools/overlapping_inundation.py b/tools/gms_tools/overlapping_inundation.py new file mode 100644 index 000000000..5dab3da96 --- /dev/null +++ b/tools/gms_tools/overlapping_inundation.py @@ -0,0 +1,505 @@ +#!/usr/bin/env python +# coding: utf-8 + +import rasterio +from rasterio.windows import from_bounds +import numpy as np +from functools import partial +from affine import Affine +from scipy.optimize import newton +from threading import Lock +import concurrent.futures +from numba import njit +import geopandas as gpd +from rasterio.mask import mask +import sys +import warnings + +class OverlapWindowMerge: + + def __init__(self, + inundation_rsts, + num_partitions=None, + window_xy_size=None): + """ + Initialize the object + + :param inundation_rsts: list of inundation paths or datasets + :param num_partitions: tuple of integers representing num windows in x and y space + :param window_xy_size: tuple of integers represeting num of pixels in windows in x an y space + """ + + # sort for largest spanning dataset (todo: handle mismatched resolutions) + size_func = lambda x: np.abs(x.bounds.left - x.bounds.right) * \ + np.abs(x.bounds.top - x.bounds.bottom) + key_sort_func = lambda x: x['size'] + datasets = [rasterio.open(ds) for ds in inundation_rsts] + ds_dict = [{'dataset': ds, 'size': size_func(ds)} for ds in datasets] + ds_dict.sort(key=key_sort_func, reverse=True) + + # load sample overlapping inundation depth rasters + self.depth_rsts = [x['dataset'] for x in ds_dict] + del ds_dict + + self.rst_dims = [[x.height, x.width] for x in self.depth_rsts] + + self.res = self.depth_rsts[0].meta['transform'][0] + self.depth_bounds = np.array([[[x.bounds.top, + x.bounds.left], + [x.bounds.bottom, + x.bounds.right]] for x in self.depth_rsts]) / self.res + + + # get transform, width, height and bounds + self.proc_unit_transform, self.proc_unit_width, \ + self.proc_unit_height, final_bounds = \ + self.get_final_dims() + + self.proc_unit_bounds = np.array([[final_bounds['top'], + final_bounds['left']], + [final_bounds['bottom'], + final_bounds['right']]]) + + self.proc_unit_bounds = self.proc_unit_bounds / self.res + + self.lat_lon_sign = [np.sign(self.proc_unit_bounds[1, 0] - self.proc_unit_bounds[0, 0]), + np.sign(self.proc_unit_bounds[1, 1] - self.proc_unit_bounds[0, 1])] + + self.partitions = num_partitions + self.window_sizes = window_xy_size + + @staticmethod + @njit + def get_res_bbox_min(x, v, z, y): + """ + Optimize for bounds that fit the final resolution + + :param x: float of compare + :param v: float representing min bound + :param z: float representing max bound + :param y: float representing resolution + """ + return np.abs(z - x) - np.round(np.abs(z - v) / y) * y + + def get_final_dims(self): + """ + Get transform, width, height, and bbox of final dataset + + :return: Affine transform, int width, int height, dict bounds + """ + + left = np.min([d.bounds.left for d in self.depth_rsts]) + top = np.max([d.bounds.top for d in self.depth_rsts]) + right = np.max([d.bounds.right for d in self.depth_rsts]) + bottom = np.min([d.bounds.bottom for d in self.depth_rsts]) + + left = newton(self.get_res_bbox_min, left, args=(left, right, self.res)) + bottom = newton(self.get_res_bbox_min, bottom, args=(bottom, top, self.res)) + + transform = self.depth_rsts[0].meta['transform'] + + width = int(np.abs(right - left) / self.res) + height = int(np.abs(top - bottom) / self.res) + new_transform = Affine(transform[0], + transform[1], + left, + transform[3], + transform[4], + top) + + return new_transform, width, height, {'left': left, + 'top': top, + 'right': right, + 'bottom': bottom} + + def get_window_coords(self): + """ + Return ul/br bounds of window and its respective window idx + + :param partitions: tuple or list of partition sizes for x and y + :param sizes: tuple or list of pixel sizes for x and y + :return: list of ul/br bounds of window, int of respective window idx + """ + + # Set up desired number of partitions (can also be set pixel size) + if self.partitions is not None: + x_res, y_res = self.partitions + elif self.window_sizes is not None: + x_res, y_res = self.window_sizes + else: + raise('in bran crunch') + + # Get window widths (both normal and edge windows) + window_width1 = np.repeat(int(self.proc_unit_width / x_res), x_res) * self.lat_lon_sign[1] + window_width2 = window_width1.copy() + window_width2[-1] += self.proc_unit_width - window_width1[0] * x_res * self.lat_lon_sign[1] + + # Get window heights (both normal and edge windows) + window_height1 = np.repeat(int(self.proc_unit_height / y_res), y_res) * self.lat_lon_sign[0] + window_height2 = window_height1.copy() + window_height2[-1] += self.proc_unit_height - window_height1[0] * y_res * self.lat_lon_sign[0] + + # Get window sizes (both normal and edge windows) + window_bounds1 = np.flip(np.array(np.meshgrid(window_width1, + window_height1)).T.reshape(-1, 2), + axis=1).astype(np.int) + window_bounds2 = np.flip(np.array(np.meshgrid(window_width2, + window_height2)).T.reshape(-1, 2), + axis=1).astype(np.int) + + window_idx = np.array(np.unravel_index(np.arange(y_res * x_res), (y_res, x_res), order='F')) + + return [window_bounds1, window_bounds2], window_idx + + def create_lat_lons(self, + window_bounds, + window_idx): + """ + Return bbox of window and list of latitudes and longitudes + + :param window_bounds: tuple or list of partition sizes for x and y + :param window_idx: int representing index of window + :return: list of float latitudes, list of float longitudes, list of window bbox, list of ul/br coords for window + """ + + upper_left = (window_idx.T * window_bounds[0]) + lower_right = upper_left + window_bounds[1] + + # Merge point arrays, convert back to original units, and get drawable path for each window + bbox = np.hstack([upper_left, lower_right]) + scaled_path_points = [np.array(np.meshgrid([st[0], st[2]], [st[1], st[3]])).T.reshape(-1, 2) for st in bbox] + path_points = (scaled_path_points + self.proc_unit_bounds[0]) * self.res + + # Create arange of latitudes and longitudes and add half of window size + latitudes = np.arange(self.proc_unit_bounds[0, 0], + self.proc_unit_bounds[1, 0] + self.lat_lon_sign[0], + window_bounds[1][0][0])[:-1] + (window_bounds[1][0][0] / 2) + longitudes = np.arange(self.proc_unit_bounds[0, 1], + self.proc_unit_bounds[1, 1] + self.lat_lon_sign[1], + window_bounds[1][0][1])[:-1] + (window_bounds[1][0][1] / 2) + + return latitudes, longitudes, path_points, bbox + + @staticmethod + def get_window_idx(latitudes, + longitudes, + coords, + partitions): + """ + Return raveled window indices + + :param latitudes: list of latitudes within bounds + :param longitudes: list of longitudes within bounds + + :return: ndarray of raveled multi indexes + """ + # Get difference of upper-left and lower-right boundaries and computed lat lons + lat_dif = [np.abs(latitudes - coords[0, 0]), np.abs(latitudes - coords[1, 0])] + lon_dif = [np.abs(longitudes - coords[0, 1]), np.abs(longitudes - coords[1, 1])] + + # Create range between the closest idx for both lats and lons + lon_range = np.arange(np.argmin(lon_dif[0]), np.argmin(lon_dif[1]) + 1) + lat_range = np.arange(np.argmin(lat_dif[0]), np.argmin(lat_dif[1]) + 1) + + # Create mesh grid for each possible set of coords and ravel to get window idx + grid = np.array(np.meshgrid(lat_range, lon_range)).T.reshape(-1, 2) + del lon_range, lat_range, lat_dif, lon_dif + return np.ravel_multi_index([grid[:, 0], grid[:, 1]], partitions, order='F') + + def read_rst_data(self, + win_idx, + datasets, + path_points, + bbox, + meta): + """ + Return data windows and final bounds of window + + :param win_idx: int window index + :param datasets: list of int representing dataset inx + :param path_points: list of bbox for windows + :param bbox: list of ul/br coords of windows + :param meta: metadata for final dataset + + :return: rasterio window object for final window, rasterio window of data window bounds, + data for each raster in window, + """ + # Get window bounding box and get final array output dimensions + window = path_points[win_idx] + window_height, window_width = np.array([np.abs(bbox[win_idx][2] - bbox[win_idx][0]), + np.abs(bbox[win_idx][3] - bbox[win_idx][1])]).astype(np.int) + + bnds = [] + data = [] + for ds in datasets: + # Get rasterio window for each pair of window bounds and depth dataset + + bnd = from_bounds(window[0][1], + window[-1][0], + window[-1][1], + window[0][0], + transform=self.depth_rsts[ds].transform, + height=window_height, + width=window_width) + + bnds.append(bnd) + + # Read raster data with window + read_data = self.depth_rsts[ds].read(1, window=bnd).astype(np.float32) + # Convert all no data to nan values + read_data[read_data == np.float32(self.depth_rsts[ds].meta['nodata'])] = np.nan + data.append(read_data) + del bnd + + final_bnds = from_bounds(window[0][1], + window[-1][0], + window[-1][1], + window[0][0], + transform=meta['transform'], + height=window_height, + width=window_width) + + return [final_bnds, bnds, data] + + + def merge_rasters(self, out_fname, nodata=-9999, threaded=False, workers=4): + """ + Merge multiple raster datasets + + :param out_fname: str path for final merged dataset + :param nodata: int/float representing no data value + """ + + window_bounds, window_idx = self.get_window_coords() + latitudes, longitudes, path_points, bbox = self.create_lat_lons(window_bounds, + window_idx) + + windows = [self.get_window_idx(latitudes, + longitudes, + coords, + self.partitions) + for coords in self.depth_bounds] + + # Create dict with window idx key and dataset idx vals + data_dict = {} + for idx, win in enumerate(windows): + for win_idx in win: + if win_idx in data_dict: + data_dict[win_idx].append(idx) + else: + data_dict[win_idx] = [idx] + + agg_function = partial(np.nanmax, axis=0) + + meta = self.depth_rsts[0].meta + + meta.update(transform=self.proc_unit_transform, + width=self.proc_unit_width, + height=self.proc_unit_height, + nodata=nodata,blockxsize=256, + blockysize=256, tiled=True, + compress='lzw') + + final_windows, data_windows, data = [], [], [] + + def __data_generator(data_dict,path_points,bbox,meta): + + for key, val in data_dict.items(): + + f_window, window, dat = self.read_rst_data(key, + val, + path_points, + bbox, + meta + ) + yield(dat, window, f_window, val) + #final_windows.append(f_window) + #data_windows.append(window) + #data.append(dat) + #del f_window, window, dat + + # create data generator + dgen = __data_generator(data_dict,path_points,bbox,meta) + + lock = Lock() + + with rasterio.open(out_fname, 'w', **meta) as rst: + + merge_partial = partial(merge_data, + rst=rst, + lock=lock, + dtype=meta['dtype'], + agg_function=agg_function, + nodata=meta['nodata'], + rst_dims=self.rst_dims) + + if not threaded: + #for d, dw, fw, ddict in zip(data, + # data_windows, + # final_windows, + # data_dict.values()): + for d, dw, fw, ddict in dgen: + merge_partial(d, dw, fw, ddict) + else: + with concurrent.futures.ThreadPoolExecutor( + max_workers=workers + ) as executor: + executor.map(merge_partial, + data, + data_windows, + final_windows, + data_dict.values() + ) + + def mask_mosaic(self,mosaic,polys,polys_layer=None,outfile=None): + + #rem_array,window_transform = mask(rem,[shape(huc['geometry'])],crop=True,indexes=1) + + # input rem + if isinstance(mosaic,str): + mosaic = rasterio.open(mosaic) + elif isinstance(mosaic,rasterio.DatasetReader): + pass + else: + raise TypeError("Pass rasterio dataset or filepath for mosaic") + + if isinstance(polys,str): + polys=gpd.read_file(polys,layer=polys_layer) + elif isinstance(polys,gpd.GeoDataFrame): + pass + else: + raise TypeError("Pass geopandas dataset or filepath for catchment polygons") + + #fossid = huc['properties']['fossid'] + #if polys.HydroID.dtype != 'str': polys.HydroID = polys.HydroID.astype(str) + #polys=polys[polys.HydroID.str.startswith(fossid)] + mosaic_array, window_transform = mask(mosaic,polys['geometry'],crop=True,indexes=1) + + if outfile: + out_profile = mosaic.profile + out_profile.update(height=mosaic_array.shape[0],width=mosaic_array.shape[1], + transform = window_transform, driver= 'GTiff', + blockxsize=256, blockysize=256, tiled=True, compress='lzw') + + with rasterio.open(outfile,'w',**out_profile) as otfi: + otfi.write(mosaic_array,indexes=1) + + return(mosaic_array,out_profile) + +# Quasi multi write +# Throughput achieved assuming processing time is not identical between windows +# and queued datasets, preferably approx N/2 threads for 9 windows +# @njit +def merge_data(rst_data, + window_bnds, + final_window, + datasets, + dtype, + rst, + lock, + agg_function, + nodata, + rst_dims + ): + """ + Merge data in to final dataset (multi threaded) + + :param rst_data: list of rst data from window + :param window_bnds: list rasterio windows representing data window bounds + :param final_window: rasterio window representing final window bounds + :param datasets: list of int representing dataset idx + :param dtype: data type of final output + :param rst: rasterio writer for final dataset + :param lock: thread concurrency lock + :param agg_function: function to aggregate datasets + :param nodata: nodata of final output + :param rst_dims: dimensions of overlapping rasters + """ + + nan_tile = np.array([np.nan]).astype(dtype)[0] + window_data = np.tile(float(nan_tile), [int(final_window.height), int(final_window.width)]) + + for data, bnds, idx in zip(rst_data, window_bnds, datasets): + # Get indices to apply to base + + col_slice = slice(int(np.max([0, + np.ceil(bnds.col_off * -1)])), + int(np.min([bnds.width, + rst_dims[idx][1] - bnds.col_off]))) + + row_slice = slice(int(np.max([0, + np.ceil(bnds.row_off * -1)])), + int(np.min([bnds.height, + rst_dims[idx][0] - bnds.row_off]))) + + win_shape = window_data[row_slice, col_slice].shape + + if not np.all(np.sign(np.array(win_shape) - np.array(data.shape)) > 0): + data = data[:win_shape[0], :win_shape[1]] + # Assign the data to the base array with aggregate function + merge = [window_data[row_slice, + col_slice], + data] + + del data + with warnings.catch_warnings(): + # This `with` block supresses the RuntimeWarning thrown by numpy when aggregating nan values + warnings.simplefilter("ignore", category=RuntimeWarning) + window_data[row_slice, col_slice] = agg_function(merge) + window_data[np.isnan(window_data)] = nodata + del merge + + del rst_data, window_bnds, datasets + + window_data[(window_data == nan_tile) | (np.isnan(window_data))] = nodata + + with lock: + rst.write_band(1, window_data.astype(dtype), window=final_window) + del window_data + + +if __name__ == '__main__': + import time + # import tracemalloc + import glob + + # print('start', time.localtime()) + # project_path = r'../documentation/data' + # overlap = OverlapWindowMerge([project_path + '/overlap1.tif', + # project_path + '/overlap2.tif', + # project_path + '/overlap3.tif', + # ], + # (3, 3)) + # overlap.merge_rasters(project_path + '/merged_overlap.tif', nodata=0) + # print('end', time.localtime()) + + # tracemalloc.start() + print('start', time.localtime()) + # project_path = r'../documentation/data' + # project_path = '*/mosaicing_data/1_fr_ms_composite' + # overlap = OverlapWindowMerge([project_path + '/inundation_extent_12090301_FR.tif', + # project_path + '/inundation_extent_12090301_MS.tif' + # ], + # (30, 30)) + # overlap.merge_rasters(project_path + '/merged_final5.tif', threaded=True, workers=4, nodata=0) + + # tracemalloc.start() + print('start', time.localtime()) + # project_path = r'../documentation/data' + # project_path = '*/mosaicing_data/2_gms' + # a = glob.glob(project_path + '/inundation*.tif') + # overlap = OverlapWindowMerge(a, + # (30, 30)) + # overlap.merge_rasters(project_path + '/merged_final5.tif', threaded=True, workers=4, nodata=-2e9) + # current, peak = tracemalloc.get_traced_memory() + # print(f"Current memory usage is {current / 10 ** 6}MB; Peak was {peak / 10 ** 6}MB") + # tracemalloc.stop() + + project_path = '*' + overlap = OverlapWindowMerge([project_path + '/nwm_resampled.tif', + project_path + '/rnr_inundation_031403_2020092000.tif' + ], + (1, 1)) + overlap.merge_rasters(project_path + '/merged_final5.tif', threaded=False, workers=4) + + print('end', time.localtime()) \ No newline at end of file diff --git a/tools/inundation.py b/tools/inundation.py index fec6e4cfc..2b5a4599c 100755 --- a/tools/inundation.py +++ b/tools/inundation.py @@ -18,6 +18,14 @@ from gdal import BuildVRT import geopandas as gpd +class hydroTableHasOnlyLakes(Exception): + """ Raised when a Hydro-Table only has lakes """ + pass + + +class NoForecastFound(Exception): + """ Raised when no forecast is available for a given Hydro-Table """ + pass def inundate( rem,catchments,catchment_poly,hydro_table,forecast,mask_type,hucs=None,hucs_layerName=None, @@ -333,7 +341,7 @@ def __inundate_in_huc(rem_array,catchments_array,crs,window_transform,rem_profil if isinstance(depths,DatasetWriter): depths.close() if isinstance(inundation_raster,DatasetWriter): inundation_raster.close() if isinstance(inundation_polygon,fiona.Collection): inundation_polygon.close() - if isinstance(hucs,fiona.Collection): inundation_polygon.close() + #if isinstance(hucs,fiona.Collection): inundation_polygon.close() # return file names of outputs for aggregation. Handle Nones try: diff --git a/tools/run_test_case.py b/tools/run_test_case.py index f398a7a17..6b7046770 100755 --- a/tools/run_test_case.py +++ b/tools/run_test_case.py @@ -8,9 +8,16 @@ from tools_shared_functions import compute_contingency_stats_from_rasters from tools_shared_variables import (TEST_CASES_DIR, INPUTS_DIR, ENDC, TRED_BOLD, WHITE_BOLD, CYAN_BOLD, AHPS_BENCHMARK_CATEGORIES) from inundation import inundate - - -def run_alpha_test(fim_run_dir, version, test_id, magnitude, compare_to_previous=False, archive_results=False, mask_type='huc', inclusion_area='', inclusion_area_buffer=0, light_run=False, overwrite=True): +from gms_tools.inundate_gms import Inundate_gms +from gms_tools.mosaic_inundation import Mosaic_inundation +from gms_tools.overlapping_inundation import OverlapWindowMerge + +def run_alpha_test(fim_run_dir, version, test_id, magnitude, + # + compare_to_previous=False, archive_results=False, + mask_type='huc', inclusion_area='', + inclusion_area_buffer=0, light_run=False, + overwrite=True): benchmark_category = test_id.split('_')[1] # Parse benchmark_category from test_id. current_huc = test_id.split('_')[0] # Break off HUC ID and assign to variable. diff --git a/tools/run_test_case_gms.py b/tools/run_test_case_gms.py new file mode 100644 index 000000000..0c5d5210f --- /dev/null +++ b/tools/run_test_case_gms.py @@ -0,0 +1,376 @@ +#!/usr/bin/env python3 + +import os +import sys +import shutil +import argparse +import traceback +from pathlib import Path +import json +import ast +import pandas as pd + +from tools_shared_functions import compute_contingency_stats_from_rasters +from tools_shared_variables import (TEST_CASES_DIR, INPUTS_DIR, ENDC, TRED_BOLD, WHITE_BOLD, CYAN_BOLD, AHPS_BENCHMARK_CATEGORIES, IFC_MAGNITUDE_LIST, BLE_MAGNITUDE_LIST ) +from inundation import inundate +from gms_tools.inundate_gms import Inundate_gms +from gms_tools.mosaic_inundation import Mosaic_inundation +from gms_tools.overlapping_inundation import OverlapWindowMerge +from glob import glob +from utils.shared_variables import elev_raster_ndv + +def run_alpha_test( fim_run_dir, version, test_id, magnitude, + calibrated, model, + compare_to_previous=False, archive_results=False, + mask_type='filter', inclusion_area='', + inclusion_area_buffer=0, light_run=False, + overwrite=True, fr_run_dir=None, + gms_workers=1,verbose=False, + gms_verbose=False + ): + + # check eval_meta input + if model not in {None,'FR','MS','GMS'}: + raise ValueError("Model argument needs to be \'FR\', \'MS\', or \'GMS.\'") + + # make bool + calibrated = bool( calibrated ) + + if (model == "MS") & (fr_run_dir is None): + raise ValueError("fr_run_dir argument needs to be specified with MS model") + + benchmark_category = test_id.split('_')[1] # Parse benchmark_category from test_id. + current_huc = test_id.split('_')[0] # Break off HUC ID and assign to variable. + + # Construct paths to development test results if not existent. + if archive_results: + version_test_case_dir_parent = os.path.join(TEST_CASES_DIR, benchmark_category + '_test_cases', test_id, 'official_versions', version) + else: + version_test_case_dir_parent = os.path.join(TEST_CASES_DIR, benchmark_category + '_test_cases', test_id, 'testing_versions', version) + + # Delete the entire directory if it already exists. + if os.path.exists(version_test_case_dir_parent): + if overwrite == True: + shutil.rmtree(version_test_case_dir_parent) + elif model == 'MS': + pass + else: + print("Metrics for ({version}: {test_id}) already exist. Use overwrite flag (-o) to overwrite metrics.".format(version=version, test_id=test_id)) + return + + os.makedirs(version_test_case_dir_parent,exist_ok=True) + + __vprint("Running the alpha test for test_id: " + test_id + ", " + version + "...",verbose) + stats_modes_list = ['total_area'] + + fim_run_parent = os.path.join(os.environ['outputDataDir'], fim_run_dir) + assert os.path.exists(fim_run_parent), "Cannot locate " + fim_run_parent + + # get hydrofabric directory + hydrofabric_dir = Path(fim_run_parent).parent.absolute() + + # Create paths to fim_run outputs for use in inundate(). + rem = os.path.join(fim_run_parent, 'rem_zeroed_masked.tif') + if not os.path.exists(rem): + rem = os.path.join(fim_run_parent, 'rem_clipped_zeroed_masked.tif') + catchments = os.path.join(fim_run_parent, 'gw_catchments_reaches_filtered_addedAttributes.tif') + if not os.path.exists(catchments): + catchments = os.path.join(fim_run_parent, 'gw_catchments_reaches_clipped_addedAttributes.tif') + if mask_type == 'huc': + catchment_poly = '' + else: + catchment_poly = os.path.join(fim_run_parent, 'gw_catchments_reaches_filtered_addedAttributes_crosswalked.gpkg') + hydro_table = os.path.join(fim_run_parent, 'hydroTable.csv') + + # Map necessary inputs for inundation(). + hucs, hucs_layerName = os.path.join(INPUTS_DIR, 'wbd', 'WBD_National.gpkg'), 'WBDHU8' + + # Create list of shapefile paths to use as exclusion areas. + zones_dir = os.path.join(TEST_CASES_DIR, 'other', 'zones') + mask_dict = {'levees': + {'path': os.path.join(zones_dir, 'leveed_areas_conus.shp'), + 'buffer': None, + 'operation': 'exclude' + }, + 'waterbodies': + {'path': os.path.join(zones_dir, 'nwm_v2_reservoirs.shp'), + 'buffer': None, + 'operation': 'exclude', + }, + } + + if inclusion_area != '': + inclusion_area_name = os.path.split(inclusion_area)[1].split('.')[0] # Get layer name + mask_dict.update({inclusion_area_name: {'path': inclusion_area, + 'buffer': int(inclusion_area_buffer), + 'operation': 'include'}}) + # Append the concatenated inclusion_area_name and buffer. + if inclusion_area_buffer == None: + inclusion_area_buffer = 0 + stats_modes_list.append(inclusion_area_name + '_b' + str(inclusion_area_buffer) + 'm') + + # Check if magnitude is list of magnitudes or single value. + magnitude_list = magnitude + if type(magnitude_list) != list: + magnitude_list = [magnitude_list] + + + # Get path to validation_data_{benchmark} directory and huc_dir. + validation_data_path = os.path.join(TEST_CASES_DIR, benchmark_category + '_test_cases', 'validation_data_' + benchmark_category) + for magnitude in magnitude_list: + version_test_case_dir = os.path.join(version_test_case_dir_parent, magnitude) + if not os.path.exists(version_test_case_dir): + os.mkdir(version_test_case_dir) + # Construct path to validation raster and forecast file. + if benchmark_category in AHPS_BENCHMARK_CATEGORIES: + benchmark_raster_path_list, forecast_list = [], [] + lid_dir_list = os.listdir(os.path.join(validation_data_path, current_huc)) + lid_list, inundation_raster_list, domain_file_list = [], [], [] + + for lid in lid_dir_list: + lid_dir = os.path.join(validation_data_path, current_huc, lid) + benchmark_lid_raster_path = os.path.join(lid_dir, magnitude, 'ahps_' + lid + '_huc_' + current_huc + '_extent_' + magnitude + '.tif') + + # Only compare if the benchmark data exist. + if os.path.exists(benchmark_lid_raster_path): + benchmark_raster_path_list.append(benchmark_lid_raster_path) # TEMP + forecast_list.append(os.path.join(lid_dir, magnitude, 'ahps_' + lid + '_huc_' + current_huc + '_flows_' + magnitude + '.csv')) # TEMP + lid_list.append(lid) + inundation_raster_list.append(os.path.join(version_test_case_dir, lid + '_inundation_extent.tif')) + domain_file_list.append(os.path.join(lid_dir, lid + '_domain.shp')) + + else: + benchmark_raster_file = os.path.join(TEST_CASES_DIR, benchmark_category + '_test_cases', 'validation_data_' + benchmark_category, current_huc, magnitude, benchmark_category + '_huc_' + current_huc + '_extent_' + magnitude + '.tif') + benchmark_raster_path_list = [benchmark_raster_file] + forecast_path = os.path.join(TEST_CASES_DIR, benchmark_category + '_test_cases', 'validation_data_' + benchmark_category, current_huc, magnitude, benchmark_category + '_huc_' + current_huc + '_flows_' + magnitude + '.csv') + forecast_list = [forecast_path] + inundation_raster_list = [os.path.join(version_test_case_dir, 'inundation_extent.tif')] + + for index in range(0, len(benchmark_raster_path_list)): + benchmark_raster_path = benchmark_raster_path_list[index] + forecast = forecast_list[index] + inundation_raster = inundation_raster_list[index] + # Only need to define ahps_lid and ahps_extent_file for AHPS_BENCHMARK_CATEGORIES. + if benchmark_category in AHPS_BENCHMARK_CATEGORIES: + ahps_lid = lid_list[index] + ahps_domain_file = domain_file_list[index] + mask_dict.update({ahps_lid: + {'path': ahps_domain_file, + 'buffer': None, + 'operation': 'include'} + }) + + + if not os.path.exists(benchmark_raster_path) or not os.path.exists(ahps_domain_file) or not os.path.exists(forecast): # Skip loop instance if the benchmark raster doesn't exist. + continue + else: # If not in AHPS_BENCHMARK_CATEGORIES. + if not os.path.exists(benchmark_raster_path) or not os.path.exists(forecast): # Skip loop instance if the benchmark raster doesn't exist. + continue + # Run inundate. + __vprint("-----> Running inundate() to produce inundation extent for the " + magnitude + " magnitude...",verbose) + # The inundate adds the huc to the name so I account for that here. + predicted_raster_path = os.path.join( + os.path.split(inundation_raster)[0], + os.path.split(inundation_raster)[1].replace('.tif', '_'+current_huc+'.tif') + ) + try: + if model == 'GMS': + + map_file = Inundate_gms( + hydrofabric_dir=hydrofabric_dir, + forecast=forecast, + num_workers=gms_workers, + hucs=current_huc, + inundation_raster=inundation_raster, + inundation_polygon=None, depths_raster=None, + verbose=gms_verbose, + log_file=None, + output_fileNames=None + ) + + mask_path_gms = os.path.join(fim_run_parent, 'wbd.gpkg') + + Mosaic_inundation( + map_file,mosaic_attribute='inundation_rasters', + mosaic_output=inundation_raster, + mask=mask_path_gms,unit_attribute_name='huc8', + nodata=elev_raster_ndv,workers=1, + remove_inputs=True, + subset=None,verbose=verbose + ) + + else: + inundate( + rem,catchments,catchment_poly,hydro_table,forecast, + mask_type,hucs=hucs,hucs_layerName=hucs_layerName, + subset_hucs=current_huc,num_workers=1,aggregate=False, + inundation_raster=inundation_raster,inundation_polygon=None, + depths=None,out_raster_profile=None,out_vector_profile=None, + quiet=True + ) + + if model =='MS': + + # Mainstems inundation + #fr_run_parent = os.path.join(os.environ['outputDataDir'], fr_run_dir,current_huc) + #assert os.path.exists(fr_run_parent), "Cannot locate " + fr_run_parent + + inundation_raster_ms = os.path.join( + os.path.split(inundation_raster)[0], + os.path.split(inundation_raster)[1].replace('.tif', '_{}_MS.tif'.format(current_huc)) + ) + inundation_raster_fr = os.path.join( + os.path.split(version_test_case_dir_parent)[0], + fr_run_dir, + magnitude, + os.path.split(inundation_raster)[1].replace('.tif', '_'+current_huc+'.tif') + ) + + os.rename(predicted_raster_path,inundation_raster_ms) + + ms_inundation_map_file = { + 'huc8' : [current_huc] * 2, + 'branchID' : [None] * 2, + 'inundation_rasters' : [inundation_raster_fr,inundation_raster_ms], + 'depths_rasters' : [None] * 2, + 'inundation_polygons' : [None] * 2 + } + ms_inundation_map_file = pd.DataFrame(ms_inundation_map_file) + + Mosaic_inundation( + ms_inundation_map_file,mosaic_attribute='inundation_rasters', + mosaic_output=inundation_raster, + mask=catchment_poly,unit_attribute_name='huc8', + nodata=elev_raster_ndv,workers=1, + remove_inputs=False, + subset=None,verbose=verbose + ) + + __vprint("-----> Inundation mapping complete.",verbose) + + # Define outputs for agreement_raster, stats_json, and stats_csv. + if benchmark_category in AHPS_BENCHMARK_CATEGORIES: + agreement_raster, stats_json, stats_csv = os.path.join(version_test_case_dir, lid + 'total_area_agreement.tif'), os.path.join(version_test_case_dir, 'stats.json'), os.path.join(version_test_case_dir, 'stats.csv') + else: + agreement_raster, stats_json, stats_csv = os.path.join(version_test_case_dir, 'total_area_agreement.tif'), os.path.join(version_test_case_dir, 'stats.json'), os.path.join(version_test_case_dir, 'stats.csv') + + compute_contingency_stats_from_rasters(predicted_raster_path, + benchmark_raster_path, + agreement_raster, + stats_csv=stats_csv, + stats_json=stats_json, + mask_values=[], + stats_modes_list=stats_modes_list, + test_id=test_id, + mask_dict=mask_dict, + ) + + if benchmark_category in AHPS_BENCHMARK_CATEGORIES: + del mask_dict[ahps_lid] + + __vprint(" ",verbose) + # print("Evaluation complete. All metrics for " + test_id + ", " + version + ", " + magnitude + " are available at " + CYAN_BOLD + version_test_case_dir + ENDC) # GMS + __vprint("Evaluation metrics for " + test_id + ", " + version + ", " + magnitude + " are available at " + CYAN_BOLD + version_test_case_dir + ENDC,verbose) # cahaba/dev + __vprint(" ",verbose) + + except Exception as e: + print(traceback.print_exc()) + #print(e) + + if benchmark_category in AHPS_BENCHMARK_CATEGORIES: + # -- Delete temp files -- # + # List all files in the output directory. + output_file_list = os.listdir(version_test_case_dir) + for output_file in output_file_list: + if "total_area" in output_file: + full_output_file_path = os.path.join(version_test_case_dir, output_file) + os.remove(full_output_file_path) + + # write out evaluation meta-data + with open(os.path.join(version_test_case_dir_parent,'eval_metadata.json'),'w') as meta: + eval_meta = { 'calibrated' : calibrated , 'model' : model } + meta.write( + json.dumps(eval_meta,indent=2) + ) + +def __vprint(message,verbose): + if verbose: + print(message) + + +if __name__ == '__main__': + + # Parse arguments. + parser = argparse.ArgumentParser(description='Inundation mapping and regression analysis for FOSS FIM. Regression analysis results are stored in the test directory.') + parser.add_argument('-r','--fim-run-dir',help='Name of directory containing outputs of fim_run.sh',required=True) + parser.add_argument('-b', '--version',help='The name of the working version in which features are being tested',required=True,default="") + parser.add_argument('-t', '--test-id',help='The test_id to use. Format as: HUC_BENCHMARKTYPE, e.g. 12345678_ble.',required=True,default="") + parser.add_argument('-m', '--mask-type', help='Specify \'huc\' (FIM < 3) or \'filter\' (FIM >= 3) masking method. MS and GMS are currently on supporting huc', required=False,default="filter") + parser.add_argument('-n','--calibrated',help='Denotes use of calibrated n values',required=False, default=False,action='store_true') + parser.add_argument('-e','--model',help='Denotes model used. FR, MS, or GMS allowed',required=True) + parser.add_argument('-y', '--magnitude',help='The magnitude to run.',required=False, default="") + parser.add_argument('-c', '--compare-to-previous', help='Compare to previous versions of HAND.', required=False,action='store_true') + parser.add_argument('-a', '--archive-results', help='Automatically copy results to the "previous_version" archive for test_id. For admin use only.', required=False,action='store_true') + parser.add_argument('-i', '--inclusion-area', help='Path to shapefile. Contingency metrics will be produced from pixels inside of shapefile extent.', required=False, default="") + parser.add_argument('-ib','--inclusion-area-buffer', help='Buffer to use when masking contingency metrics with inclusion area.', required=False, default="0") + parser.add_argument('-l', '--light-run', help='Using the light_run option will result in only stat files being written, and NOT grid files.', required=False, action='store_true') + parser.add_argument('-o','--overwrite',help='Overwrite all metrics or only fill in missing metrics.',required=False, default=False, action='store_true') + parser.add_argument('-w','--gms-workers', help='Number of workers to use for GMS Branch Inundation', required=False, default=1) + parser.add_argument('-d','--fr-run-dir',help='Name of test case directory containing inundation for FR configuration',required=False,default=None) + parser.add_argument('-v', '--verbose', help='Verbose operation', required=False, action='store_true', default=False) + parser.add_argument('-vg', '--gms-verbose', help='Prints progress bar for GMS', required=False, action='store_true', default=False) + + # Extract to dictionary and assign to variables. + args = vars(parser.parse_args()) + + valid_test_id_list = os.listdir(TEST_CASES_DIR) + + exit_flag = False # Default to False. + __vprint("",args['verbose']) + + # Ensure test_id is valid. +# if args['test_id'] not in valid_test_id_list: +# print(TRED_BOLD + "Warning: " + WHITE_BOLD + "The provided test_id (-t) " + CYAN_BOLD + args['test_id'] + WHITE_BOLD + " is not available." + ENDC) +# print(WHITE_BOLD + "Available test_ids include: " + ENDC) +# for test_id in valid_test_id_list: +# if 'validation' not in test_id.split('_') and 'ble' in test_id.split('_'): +# print(CYAN_BOLD + test_id + ENDC) +# print() +# exit_flag = True + + # Ensure fim_run_dir exists. + if not os.path.exists(os.path.join(os.environ['outputDataDir'], args['fim_run_dir'])): + print(TRED_BOLD + "Warning: " + WHITE_BOLD + "The provided fim_run_dir (-r) " + CYAN_BOLD + args['fim_run_dir'] + WHITE_BOLD + " could not be located in the 'outputs' directory." + ENDC) + print(WHITE_BOLD + "Please provide the parent directory name for fim_run.sh outputs. These outputs are usually written in a subdirectory, e.g. outputs/123456/123456." + ENDC) + print() + exit_flag = True + + # Ensure inclusion_area path exists. + if args['inclusion_area'] != "" and not os.path.exists(args['inclusion_area']): + print(TRED_BOLD + "Error: " + WHITE_BOLD + "The provided inclusion_area (-i) " + CYAN_BOLD + args['inclusion_area'] + WHITE_BOLD + " could not be located." + ENDC) + exit_flag = True + + try: + inclusion_buffer = int(args['inclusion_area_buffer']) + except ValueError: + print(TRED_BOLD + "Error: " + WHITE_BOLD + "The provided inclusion_area_buffer (-ib) " + CYAN_BOLD + args['inclusion_area_buffer'] + WHITE_BOLD + " is not a round number." + ENDC) + + benchmark_category = args['test_id'].split('_')[1] + + if args['magnitude'] == '': + if 'ble' == benchmark_category: + args['magnitude'] = BLE_MAGNITUDE_LIST + elif ('nws' == benchmark_category) | ('usgs' == benchmark_category): + args['magnitude'] = ['action', 'minor', 'moderate', 'major'] + elif 'ifc' == current_benchmark_category: + args['magnitude'] = IFC_MAGNITUDE_LIST + else: + print(TRED_BOLD + "Error: " + WHITE_BOLD + "The provided magnitude (-y) " + CYAN_BOLD + args['magnitude'] + WHITE_BOLD + " is invalid. ble options include: 100yr, 500yr. ahps options include action, minor, moderate, major." + ENDC) + exit_flag = True + + if exit_flag: + print() + sys.exit() + + else: + run_alpha_test(**args) \ No newline at end of file diff --git a/tools/tools_shared_variables.py b/tools/tools_shared_variables.py index 04356c3e7..0c31a6fb8 100755 --- a/tools/tools_shared_variables.py +++ b/tools/tools_shared_variables.py @@ -50,6 +50,8 @@ ] DISCARD_AHPS_QUERY = "not flow.isnull() & masked_perc<97 & not nws_lid in @BAD_SITES" +elev_raster_ndv = -9999.0 + # Colors. ENDC = '\033[m' TGREEN_BOLD = '\033[32;1m'