Skip to content

Commit

Permalink
Enhancements to post-processing for Viz-related use-cases.
Browse files Browse the repository at this point in the history
- Aggregate grids are projected to Web Mercator during -v runs in fim_run.sh.
- HUC6 aggregation is parallelized.
- Aggregate grid blocksize is changed from 256 to 1024 for faster postprocessing.

This resolves #294 and resolves #295.
  • Loading branch information
brian.avant authored Mar 11, 2021
1 parent 18afaef commit 4364dc4
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 80 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ All notable changes to this project will be documented in this file.
We follow the [Semantic Versioning 2.0.0](http://semver.org/) format.
<br/><br/>

## v3.0.8.2 - 2021-03-11 - [PR #296](https://github.com/NOAA-OWP/cahaba/pull/296)

Enhancements to post-processing for Viz-related use-cases.

### Changes
- Aggregate grids are projected to Web Mercator during `-v` runs in `fim_run.sh`.
- HUC6 aggregation is parallelized.
- Aggregate grid blocksize is changed from 256 to 1024 for faster postprocessing.

<br/><br/>
## v3.0.8.1 - 2021-03-10 - [PR #302](https://github.com/NOAA-OWP/cahaba/pull/302)

Patched import issue in `tools_shared_functions.py`.
Expand Down
2 changes: 1 addition & 1 deletion fim_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -152,5 +152,5 @@ fi
echo "$viz"
if [[ "$viz" -eq 1 ]]; then
# aggregate outputs
python3 /foss_fim/src/aggregate_fim_outputs.py -d $outputRunDataDir
python3 /foss_fim/src/aggregate_fim_outputs.py -d $outputRunDataDir -j 4
fi
206 changes: 127 additions & 79 deletions src/aggregate_fim_outputs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,37 @@

import os
import argparse
from multiprocessing import Pool
import pandas as pd
import json
import rasterio
from rasterio.merge import merge
from rasterio.warp import calculate_default_transform, reproject, Resampling
import shutil
import csv
from utils.shared_variables import PREP_PROJECTION
from utils.shared_variables import PREP_PROJECTION,VIZ_PROJECTION

def aggregate_fim_outputs(args):

def aggregate_fim_outputs(fim_out_dir):
fim_out_dir = args[0]
huc6 = args[1]
huc_list = args[2]

print ("aggregating outputs to HUC6 scale")
print(f"aggregating {huc6}")

drop_folders = ['logs']
huc_list = [huc for huc in os.listdir(fim_out_dir) if huc not in drop_folders]
huc6_list = [str(huc[0:6]) for huc in os.listdir(fim_out_dir) if huc not in drop_folders]
huc6_list = list(set(huc6_list))
huc6_dir = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6))
os.makedirs(huc6_dir, exist_ok=True)

for huc in huc_list:
# aggregate file name paths
aggregate_hydrotable = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),'hydroTable.csv')
aggregate_src = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),f'rating_curves_{huc6}.json')

os.makedirs(os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc[0:6])), exist_ok=True)
for huc in huc_list:

# original file paths
hydrotable_filename = os.path.join(fim_out_dir,huc,'hydroTable.csv')
src_filename = os.path.join(fim_out_dir,huc,'src.json')

# aggregate file name paths
aggregate_hydrotable = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc[0:6]),'hydroTable.csv')
aggregate_src = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc[0:6]),f'rating_curves_{huc[0:6]}.json')

if len(huc)> 6:

# open hydrotable
Expand Down Expand Up @@ -68,107 +69,154 @@ def aggregate_fim_outputs(fim_out_dir):
shutil.copy(hydrotable_filename, aggregate_hydrotable)
shutil.copy(src_filename, aggregate_src)

for huc6 in huc6_list:

## add feature_id to aggregate src
aggregate_hydrotable = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),'hydroTable.csv')
aggregate_src = os.path.join(fim_out_dir,'aggregate_fim_outputs',str(huc6),f'rating_curves_{huc6}.json')

# Open aggregate src for writing feature_ids to
src_data = {}
with open(aggregate_src) as jsonf:
src_data = json.load(jsonf)

with open(aggregate_hydrotable) as csvf:
csvReader = csv.DictReader(csvf)

for row in csvReader:
if row['HydroID'].lstrip('0') in src_data and 'nwm_feature_id' not in src_data[row['HydroID'].lstrip('0')]:
src_data[row['HydroID'].lstrip('0')]['nwm_feature_id'] = row['feature_id']

# Write src_data to JSON file
with open(aggregate_src, 'w') as jsonf:
json.dump(src_data, jsonf)
## add feature_id to aggregate src
# Open aggregate src for writing feature_ids to
src_data = {}
with open(aggregate_src) as jsonf:
src_data = json.load(jsonf)

## aggregate rasters
huc6_dir = os.path.join(fim_out_dir,'aggregate_fim_outputs',huc6)
with open(aggregate_hydrotable) as csvf:
csvReader = csv.DictReader(csvf)

# aggregate file paths
rem_mosaic = os.path.join(huc6_dir,f'hand_grid_{huc6}.tif')
catchment_mosaic = os.path.join(huc6_dir,f'catchments_{huc6}.tif')
for row in csvReader:
if row['HydroID'].lstrip('0') in src_data and 'nwm_feature_id' not in src_data[row['HydroID'].lstrip('0')]:
src_data[row['HydroID'].lstrip('0')]['nwm_feature_id'] = row['feature_id']

if huc6 not in huc_list:
# Write src_data to JSON file
with open(aggregate_src, 'w') as jsonf:
json.dump(src_data, jsonf)

huc6_filter = [path.startswith(huc6) for path in huc_list]
subset_huc6_list = [i for (i, v) in zip(huc_list, huc6_filter) if v]
## aggregate rasters
# aggregate file paths
rem_mosaic = os.path.join(huc6_dir,f'hand_grid_{huc6}_unprj.tif')
catchment_mosaic = os.path.join(huc6_dir,f'catchments_{huc6}_unprj.tif')

# aggregate and mosaic rem
rem_list = [os.path.join(fim_out_dir,huc,'rem_zeroed_masked.tif') for huc in subset_huc6_list]
if huc6 not in huc_list:

if len(rem_list) > 1:
huc6_filter = [path.startswith(huc6) for path in huc_list]
subset_huc6_list = [i for (i, v) in zip(huc_list, huc6_filter) if v]

rem_files_to_mosaic = []
# aggregate and mosaic rem
rem_list = [os.path.join(fim_out_dir,huc,'rem_zeroed_masked.tif') for huc in subset_huc6_list]

for rem in rem_list:
if len(rem_list) > 1:

rem_src = rasterio.open(rem)
rem_files_to_mosaic.append(rem_src)
rem_files_to_mosaic = []

mosaic, out_trans = merge(rem_files_to_mosaic)
out_meta = rem_src.meta.copy()
out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'})
for rem in rem_list:

with rasterio.open(rem_mosaic, "w", **out_meta, tiled=True, blockxsize=256, blockysize=256, BIGTIFF='YES') as dest:
dest.write(mosaic)
rem_src = rasterio.open(rem)
rem_files_to_mosaic.append(rem_src)

del rem_files_to_mosaic,rem_src,out_meta,mosaic
mosaic, out_trans = merge(rem_files_to_mosaic)
out_meta = rem_src.meta.copy()
out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'})

elif len(rem_list)==1:
with rasterio.open(rem_mosaic, "w", **out_meta, tiled=True, blockxsize=1024, blockysize=1024, BIGTIFF='YES') as dest:
dest.write(mosaic)

shutil.copy(rem_list[0], rem_mosaic)
del rem_files_to_mosaic,rem_src,out_meta,mosaic

# aggregate and mosaic catchments
catchment_list = [os.path.join(fim_out_dir,huc,'gw_catchments_reaches_filtered_addedAttributes.tif') for huc in subset_huc6_list]
elif len(rem_list)==1:

if len(catchment_list) > 1:
shutil.copy(rem_list[0], rem_mosaic)

cat_files_to_mosaic = []
# aggregate and mosaic catchments
catchment_list = [os.path.join(fim_out_dir,huc,'gw_catchments_reaches_filtered_addedAttributes.tif') for huc in subset_huc6_list]

for cat in catchment_list:
cat_src = rasterio.open(cat)
cat_files_to_mosaic.append(cat_src)
if len(catchment_list) > 1:

mosaic, out_trans = merge(cat_files_to_mosaic)
out_meta = cat_src.meta.copy()
cat_files_to_mosaic = []

out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'})
for cat in catchment_list:
cat_src = rasterio.open(cat)
cat_files_to_mosaic.append(cat_src)

with rasterio.open(catchment_mosaic, "w", **out_meta, tiled=True, blockxsize=256, blockysize=256, BIGTIFF='YES') as dest:
dest.write(mosaic)
mosaic, out_trans = merge(cat_files_to_mosaic)
out_meta = cat_src.meta.copy()

del cat_files_to_mosaic,cat_src,out_meta,mosaic
out_meta.update({"driver": "GTiff", "height": mosaic.shape[1], "width": mosaic.shape[2], "dtype": str(mosaic.dtype), "transform": out_trans,"crs": PREP_PROJECTION,'compress': 'lzw'})

elif len(catchment_list)==1:
with rasterio.open(catchment_mosaic, "w", **out_meta, tiled=True, blockxsize=1024, blockysize=1024, BIGTIFF='YES') as dest:
dest.write(mosaic)

shutil.copy(catchment_list[0], catchment_mosaic)
del cat_files_to_mosaic,cat_src,out_meta,mosaic

else:
# original file paths
rem_filename = os.path.join(fim_out_dir,huc6,'rem_zeroed_masked.tif')
catchment_filename = os.path.join(fim_out_dir,huc6,'gw_catchments_reaches_filtered_addedAttributes.tif')
elif len(catchment_list)==1:

shutil.copy(rem_filename, rem_mosaic)
shutil.copy(catchment_filename, catchment_mosaic)
shutil.copy(catchment_list[0], catchment_mosaic)

else:
# original file paths
rem_filename = os.path.join(fim_out_dir,huc6,'rem_zeroed_masked.tif')
catchment_filename = os.path.join(fim_out_dir,huc6,'gw_catchments_reaches_filtered_addedAttributes.tif')

shutil.copy(rem_filename, rem_mosaic)
shutil.copy(catchment_filename, catchment_mosaic)

## reproject rasters
reproject_raster(rem_mosaic)
os.remove(rem_mosaic)

reproject_raster(catchment_mosaic)
os.remove(catchment_mosaic)


def reproject_raster(raster_name):

with rasterio.open(raster_name) as src:
transform, width, height = calculate_default_transform(
src.crs, VIZ_PROJECTION, src.width, src.height, *src.bounds)
kwargs = src.meta.copy()
kwargs.update({
'crs': VIZ_PROJECTION,
'transform': transform,
'width': width,
'height': height,
'compress': 'lzw'
})

raster_proj_rename = os.path.split(raster_name)[1].replace('_unprj.tif', '.tif')
raster_proj_dir = os.path.join(os.path.dirname(raster_name), raster_proj_rename)

with rasterio.open(raster_proj_dir, 'w', **kwargs, tiled=True, blockxsize=1024, blockysize=1024, BIGTIFF='YES') as dst:
# for i in range(1, src.count + 1):
reproject(
source=rasterio.band(src, 1),
destination=rasterio.band(dst, 1),
src_transform=src.transform,
src_crs=src.crs,
dst_transform=transform,
dst_crs=VIZ_PROJECTION,
resampling=Resampling.nearest)
del src, dst

if __name__ == '__main__':

parser = argparse.ArgumentParser(description='Aggregate layers buy HUC6')
parser.add_argument('-d','--fim-outputs-directory', help='FIM outputs directory', required=True)
parser.add_argument('-j','--number-of-jobs',help='Number of processes to use. Default is 1.',required=False, default="1",type=int)


args = vars(parser.parse_args())

fim_outputs_directory = args['fim_outputs_directory']
number_of_jobs = int(args['number_of_jobs'])

drop_folders = ['logs']
huc_list = [huc for huc in os.listdir(fim_outputs_directory) if huc not in drop_folders]
huc6_list = [str(huc[0:6]) for huc in os.listdir(fim_outputs_directory) if huc not in drop_folders]
huc6_list = list(set(huc6_list))


procs_list = []

for huc6 in huc6_list:

limited_huc_list = [huc for huc in huc_list if huc.startswith(huc6)]

procs_list.append([fim_outputs_directory,huc6,limited_huc_list])

aggregate_fim_outputs(fim_outputs_directory)
print(f"aggregating {len(huc_list)} hucs to HUC6 scale using {number_of_jobs} jobs")
pool = Pool(number_of_jobs)
pool.map(aggregate_fim_outputs, procs_list)

0 comments on commit 4364dc4

Please sign in to comment.