diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b9bce771..38b9f4980 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,16 @@ All notable changes to this project will be documented in this file. We follow the [Semantic Versioning 2.0.0](http://semver.org/) format.

+## v3.0.8.2 - 2021-03-11 - [PR #296](https://github.com/NOAA-OWP/cahaba/pull/296) + +Enhancements to post-processing for Viz-related use-cases. + +### Changes + - Aggregate grids are projected to Web Mercator during `-v` runs in `fim_run.sh`. + - HUC6 aggregation is parallelized. + - Aggregate grid blocksize is changed from 256 to 1024 for faster postprocessing. + +

## v3.0.8.1 - 2021-03-10 - [PR #302](https://github.com/NOAA-OWP/cahaba/pull/302) Patched import issue in `tools_shared_functions.py`. diff --git a/fim_run.sh b/fim_run.sh index 5acdeff71..42a5d022e 100755 --- a/fim_run.sh +++ b/fim_run.sh @@ -152,5 +152,5 @@ fi echo "$viz" if [[ "$viz" -eq 1 ]]; then # aggregate outputs - python3 /foss_fim/src/aggregate_fim_outputs.py -d $outputRunDataDir + python3 /foss_fim/src/aggregate_fim_outputs.py -d $outputRunDataDir -j 4 fi diff --git a/src/aggregate_fim_outputs.py b/src/aggregate_fim_outputs.py index edafd93a3..9d8676364 100644 --- a/src/aggregate_fim_outputs.py +++ b/src/aggregate_fim_outputs.py @@ -2,36 +2,37 @@ import os import argparse +from multiprocessing import Pool import pandas as pd import json import rasterio from rasterio.merge import merge +from rasterio.warp import calculate_default_transform, reproject, Resampling import shutil import csv -from utils.shared_variables import PREP_PROJECTION +from utils.shared_variables import PREP_PROJECTION,VIZ_PROJECTION +def aggregate_fim_outputs(args): -def aggregate_fim_outputs(fim_out_dir): + fim_out_dir = args[0] + huc6 = args[1] + huc_list = args[2] - print ("aggregating outputs to HUC6 scale") + print(f"aggregating {huc6}") - drop_folders = ['logs'] - huc_list = [huc for huc in os.listdir(fim_out_dir) if huc not in drop_folders] - huc6_list = [str(huc[0:6]) for huc in os.listdir(fim_out_dir) if huc not in drop_folders] - huc6_list = list(set(huc6_list)) + huc6_dir = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6)) + os.makedirs(huc6_dir, exist_ok=True) - for huc in huc_list: + # aggregate file name paths + aggregate_hydrotable = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),'hydroTable.csv') + aggregate_src = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),f'rating_curves_{huc6}.json') - os.makedirs(os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc[0:6])), exist_ok=True) + for huc in huc_list: # original file paths hydrotable_filename = os.path.join(fim_out_dir,huc,'hydroTable.csv') src_filename = os.path.join(fim_out_dir,huc,'src.json') - # aggregate file name paths - aggregate_hydrotable = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc[0:6]),'hydroTable.csv') - aggregate_src = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc[0:6]),f'rating_curves_{huc[0:6]}.json') - if len(huc)> 6: # open hydrotable @@ -68,107 +69,154 @@ def aggregate_fim_outputs(fim_out_dir): shutil.copy(hydrotable_filename, aggregate_hydrotable) shutil.copy(src_filename, aggregate_src) - for huc6 in huc6_list: - - ## add feature_id to aggregate src - aggregate_hydrotable = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),'hydroTable.csv') - aggregate_src = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),f'rating_curves_{huc6}.json') - - # Open aggregate src for writing feature_ids to - src_data = {} - with open(aggregate_src) as jsonf: - src_data = json.load(jsonf) - - with open(aggregate_hydrotable) as csvf: - csvReader = csv.DictReader(csvf) - - for row in csvReader: - if row['HydroID'].lstrip('0') in src_data and 'nwm_feature_id' not in src_data[row['HydroID'].lstrip('0')]: - src_data[row['HydroID'].lstrip('0')]['nwm_feature_id'] = row['feature_id'] - - # Write src_data to JSON file - with open(aggregate_src, 'w') as jsonf: - json.dump(src_data, jsonf) + ## add feature_id to aggregate src + # Open aggregate src for writing feature_ids to + src_data = {} + with open(aggregate_src) as jsonf: + src_data = json.load(jsonf) - ## aggregate rasters - huc6_dir = os.path.join(fim_out_dir,'aggregate_fim_outputs',huc6) + with open(aggregate_hydrotable) as csvf: + csvReader = csv.DictReader(csvf) - # aggregate file paths - rem_mosaic = os.path.join(huc6_dir,f'hand_grid_{huc6}.tif') - catchment_mosaic = os.path.join(huc6_dir,f'catchments_{huc6}.tif') + for row in csvReader: + if row['HydroID'].lstrip('0') in src_data and 'nwm_feature_id' not in src_data[row['HydroID'].lstrip('0')]: + src_data[row['HydroID'].lstrip('0')]['nwm_feature_id'] = row['feature_id'] - if huc6 not in huc_list: + # Write src_data to JSON file + with open(aggregate_src, 'w') as jsonf: + json.dump(src_data, jsonf) - huc6_filter = [path.startswith(huc6) for path in huc_list] - subset_huc6_list = [i for (i, v) in zip(huc_list, huc6_filter) if v] + ## aggregate rasters + # aggregate file paths + rem_mosaic = os.path.join(huc6_dir,f'hand_grid_{huc6}_unprj.tif') + catchment_mosaic = os.path.join(huc6_dir,f'catchments_{huc6}_unprj.tif') - # aggregate and mosaic rem - rem_list = [os.path.join(fim_out_dir,huc,'rem_zeroed_masked.tif') for huc in subset_huc6_list] + if huc6 not in huc_list: - if len(rem_list) > 1: + huc6_filter = [path.startswith(huc6) for path in huc_list] + subset_huc6_list = [i for (i, v) in zip(huc_list, huc6_filter) if v] - rem_files_to_mosaic = [] + # aggregate and mosaic rem + rem_list = [os.path.join(fim_out_dir,huc,'rem_zeroed_masked.tif') for huc in subset_huc6_list] - for rem in rem_list: + if len(rem_list) > 1: - rem_src = rasterio.open(rem) - rem_files_to_mosaic.append(rem_src) + rem_files_to_mosaic = [] - mosaic, out_trans = merge(rem_files_to_mosaic) - out_meta = rem_src.meta.copy() - out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'}) + for rem in rem_list: - with rasterio.open(rem_mosaic, "w", **out_meta, tiled=True, blockxsize=256, blockysize=256, BIGTIFF='YES') as dest: - dest.write(mosaic) + rem_src = rasterio.open(rem) + rem_files_to_mosaic.append(rem_src) - del rem_files_to_mosaic,rem_src,out_meta,mosaic + mosaic, out_trans = merge(rem_files_to_mosaic) + out_meta = rem_src.meta.copy() + out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'}) - elif len(rem_list)==1: + with rasterio.open(rem_mosaic, "w", **out_meta, tiled=True, blockxsize=1024, blockysize=1024, BIGTIFF='YES') as dest: + dest.write(mosaic) - shutil.copy(rem_list[0], rem_mosaic) + del rem_files_to_mosaic,rem_src,out_meta,mosaic - # aggregate and mosaic catchments - catchment_list = [os.path.join(fim_out_dir,huc,'gw_catchments_reaches_filtered_addedAttributes.tif') for huc in subset_huc6_list] + elif len(rem_list)==1: - if len(catchment_list) > 1: + shutil.copy(rem_list[0], rem_mosaic) - cat_files_to_mosaic = [] + # aggregate and mosaic catchments + catchment_list = [os.path.join(fim_out_dir,huc,'gw_catchments_reaches_filtered_addedAttributes.tif') for huc in subset_huc6_list] - for cat in catchment_list: - cat_src = rasterio.open(cat) - cat_files_to_mosaic.append(cat_src) + if len(catchment_list) > 1: - mosaic, out_trans = merge(cat_files_to_mosaic) - out_meta = cat_src.meta.copy() + cat_files_to_mosaic = [] - out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'}) + for cat in catchment_list: + cat_src = rasterio.open(cat) + cat_files_to_mosaic.append(cat_src) - with rasterio.open(catchment_mosaic, "w", **out_meta, tiled=True, blockxsize=256, blockysize=256, BIGTIFF='YES') as dest: - dest.write(mosaic) + mosaic, out_trans = merge(cat_files_to_mosaic) + out_meta = cat_src.meta.copy() - del cat_files_to_mosaic,cat_src,out_meta,mosaic + out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'}) - elif len(catchment_list)==1: + with rasterio.open(catchment_mosaic, "w", **out_meta, tiled=True, blockxsize=1024, blockysize=1024, BIGTIFF='YES') as dest: + dest.write(mosaic) - shutil.copy(catchment_list[0], catchment_mosaic) + del cat_files_to_mosaic,cat_src,out_meta,mosaic - else: - # original file paths - rem_filename = os.path.join(fim_out_dir,huc6,'rem_zeroed_masked.tif') - catchment_filename = os.path.join(fim_out_dir,huc6,'gw_catchments_reaches_filtered_addedAttributes.tif') + elif len(catchment_list)==1: - shutil.copy(rem_filename, rem_mosaic) - shutil.copy(catchment_filename, catchment_mosaic) + shutil.copy(catchment_list[0], catchment_mosaic) + else: + # original file paths + rem_filename = os.path.join(fim_out_dir,huc6,'rem_zeroed_masked.tif') + catchment_filename = os.path.join(fim_out_dir,huc6,'gw_catchments_reaches_filtered_addedAttributes.tif') + + shutil.copy(rem_filename, rem_mosaic) + shutil.copy(catchment_filename, catchment_mosaic) + + ## reproject rasters + reproject_raster(rem_mosaic) + os.remove(rem_mosaic) + + reproject_raster(catchment_mosaic) + os.remove(catchment_mosaic) + + +def reproject_raster(raster_name): + + with rasterio.open(raster_name) as src: + transform, width, height = calculate_default_transform( + src.crs, VIZ_PROJECTION, src.width, src.height, *src.bounds) + kwargs = src.meta.copy() + kwargs.update({ + 'crs': VIZ_PROJECTION, + 'transform': transform, + 'width': width, + 'height': height, + 'compress': 'lzw' + }) + + raster_proj_rename = os.path.split(raster_name)[1].replace('_unprj.tif', '.tif') + raster_proj_dir = os.path.join(os.path.dirname(raster_name), raster_proj_rename) + + with rasterio.open(raster_proj_dir, 'w', **kwargs, tiled=True, blockxsize=1024, blockysize=1024, BIGTIFF='YES') as dst: + # for i in range(1, src.count + 1): + reproject( + source=rasterio.band(src, 1), + destination=rasterio.band(dst, 1), + src_transform=src.transform, + src_crs=src.crs, + dst_transform=transform, + dst_crs=VIZ_PROJECTION, + resampling=Resampling.nearest) + del src, dst if __name__ == '__main__': parser = argparse.ArgumentParser(description='Aggregate layers buy HUC6') parser.add_argument('-d','--fim-outputs-directory', help='FIM outputs directory', required=True) + parser.add_argument('-j','--number-of-jobs',help='Number of processes to use. Default is 1.',required=False, default="1",type=int) args = vars(parser.parse_args()) fim_outputs_directory = args['fim_outputs_directory'] + number_of_jobs = int(args['number_of_jobs']) + + drop_folders = ['logs'] + huc_list = [huc for huc in os.listdir(fim_outputs_directory) if huc not in drop_folders] + huc6_list = [str(huc[0:6]) for huc in os.listdir(fim_outputs_directory) if huc not in drop_folders] + huc6_list = list(set(huc6_list)) + + + procs_list = [] + + for huc6 in huc6_list: + + limited_huc_list = [huc for huc in huc_list if huc.startswith(huc6)] + + procs_list.append([fim_outputs_directory,huc6,limited_huc_list]) - aggregate_fim_outputs(fim_outputs_directory) + print(f"aggregating {len(huc_list)} hucs to HUC6 scale using {number_of_jobs} jobs") + pool = Pool(number_of_jobs) + pool.map(aggregate_fim_outputs, procs_list)