diff --git a/src/subdiv_chan_obank_src.py b/src/subdiv_chan_obank_src.py index e7482d1e..228ed24a 100755 --- a/src/subdiv_chan_obank_src.py +++ b/src/subdiv_chan_obank_src.py @@ -373,7 +373,9 @@ def multi_process(variable_mannings_calc, procs_list, log_file, number_of_jobs, log_file.writelines(["%s\n" % item for item in map_output]) -def run_prep(fim_dir, mann_n_table, output_suffix, number_of_jobs, verbose, src_plot_option): +def run_prep( + fim_dir, mann_n_table, output_suffix, number_of_jobs, verbose, src_plot_option, process_huc=None +): procs_list = [] print(f"Writing progress to log file here: {fim_dir}/logs/subdiv_src_{output_suffix}.log") @@ -407,53 +409,59 @@ def run_prep(fim_dir, mann_n_table, output_suffix, number_of_jobs, verbose, src_ else: print('Running the variable_mannings_calc function...') - ## Loop through hucs in the fim_dir and create list of variables to feed to multiprocessing - huc_list = [d for d in os.listdir(fim_dir) if re.match(r'^\d{8}$', d)] - huc_list.sort() # sort huc_list for helping track progress in future print statments + if process_huc is None: + ## Loop through hucs in the fim_dir and create list of variables to feed to multiprocessing + huc_list = [d for d in os.listdir(fim_dir) if re.match(r'^\d{8}$', d)] + huc_list.sort() # sort huc_list for helping track progress in future print statments + else: + huc_list = [process_huc] for huc in huc_list: # if huc != 'logs' and huc[-3:] != 'log' and huc[-4:] != '.csv': - if re.match(r'\d{8}', huc): - huc_branches_dir = os.path.join(fim_dir, huc, 'branches') - for branch_id in os.listdir(huc_branches_dir): - branch_dir = os.path.join(huc_branches_dir, branch_id) - in_src_bankfull_filename = join(branch_dir, 'src_full_crosswalked_' + branch_id + '.csv') - htable_filename = join(branch_dir, 'hydroTable_' + branch_id + '.csv') - huc_plot_output_dir = join(branch_dir, 'src_plots') - - if isfile(in_src_bankfull_filename) and isfile(htable_filename): - procs_list.append( - [ - in_src_bankfull_filename, - df_mann, - huc, - branch_id, - htable_filename, - output_suffix, - src_plot_option, - huc_plot_output_dir, - ] - ) - else: - print( - 'HUC: ' - + str(huc) - + ' branch id: ' - + str(branch_id) - + '\nWARNING --> can not find required file (src_full_crosswalked_bankfull_*.csv ' - + 'or hydroTable_*.csv) in the fim output dir: ' - + str(branch_dir) - + ' - skipping this branch!!!\n' - ) - log_file.write( - 'HUC: ' - + str(huc) - + ' branch id: ' - + str(branch_id) - + '\nWARNING --> can not find required file (src_full_crosswalked_bankfull_*.csv ' - + 'or hydroTable_*.csv) in the fim output dir: ' - + str(branch_dir) - + ' - skipping this branch!!!\n' + if process_huc is None or huc in process_huc: + if re.match(r'\d{8}', huc): + huc_branches_dir = os.path.join(fim_dir, huc, 'branches') + for branch_id in os.listdir(huc_branches_dir): + branch_dir = os.path.join(huc_branches_dir, branch_id) + in_src_bankfull_filename = join( + branch_dir, 'src_full_crosswalked_' + branch_id + '.csv' ) + htable_filename = join(branch_dir, 'hydroTable_' + branch_id + '.csv') + huc_plot_output_dir = join(branch_dir, 'src_plots') + + if isfile(in_src_bankfull_filename) and isfile(htable_filename): + procs_list.append( + [ + in_src_bankfull_filename, + df_mann, + huc, + branch_id, + htable_filename, + output_suffix, + src_plot_option, + huc_plot_output_dir, + ] + ) + else: + print( + 'HUC: ' + + str(huc) + + ' branch id: ' + + str(branch_id) + + '\nWARNING --> can not find required file (src_full_crosswalked_bankfull_*.csv ' + + 'or hydroTable_*.csv) in the fim output dir: ' + + str(branch_dir) + + ' - skipping this branch!!!\n' + ) + log_file.write( + 'HUC: ' + + str(huc) + + ' branch id: ' + + str(branch_id) + + '\nWARNING --> can not find required file (src_full_crosswalked_bankfull_*.csv ' + + 'or hydroTable_*.csv) in the fim output dir: ' + + str(branch_dir) + + ' - skipping this branch!!!\n' + ) ## Pass huc procs_list to multiprocessing function multi_process(variable_mannings_calc, procs_list, log_file, number_of_jobs, verbose) diff --git a/src/utils/shared_functions.py b/src/utils/shared_functions.py index f9f51a53..ac2790fc 100644 --- a/src/utils/shared_functions.py +++ b/src/utils/shared_functions.py @@ -479,7 +479,7 @@ def print_date_time_duration(start_dt, end_dt): total_hours, rem_seconds = divmod(rem_seconds, 60 * 60) total_mins, seconds = divmod(rem_seconds, 60) - time_fmt = f"{total_hours:02d} hours {total_mins:02d} mins {seconds:02d} secs" + time_fmt = f"{total_days:02d} days {total_hours:02d} hours {total_mins:02d} mins {seconds:02d} secs" duration_msg = "Duration: " + time_fmt print(duration_msg) diff --git a/tools/inundate_gms.py b/tools/inundate_gms.py index 0e72238b..c651e3a4 100755 --- a/tools/inundate_gms.py +++ b/tools/inundate_gms.py @@ -15,6 +15,7 @@ def Inundate_gms( hydrofabric_dir, forecast, num_workers=1, + hydro_table_df=None, hucs=None, inundation_raster=None, inundation_polygon=None, @@ -67,6 +68,7 @@ def Inundate_gms( inundation_polygon, depths_raster, forecast, + hydro_table_df, verbose=False, ) @@ -160,8 +162,16 @@ def __inundate_gms_generator( inundation_polygon, depths_raster, forecast, + hydro_table_df=None, verbose=False, ): + """ + Generator for use in parallelizing inundation + + Parameters + ---------- + + """ # Iterate over branches for idx, row in hucs_branches.iterrows(): huc = str(row[0]) @@ -177,35 +187,43 @@ def __inundate_gms_generator( catchments_branch = os.path.join(branch_dir, catchments_file_name) # FIM versions > 4.3.5 use an aggregated hydrotable file rather than individual branch hydrotables - hydroTable_huc = os.path.join(huc_dir, "hydrotable.csv") - if os.path.isfile(hydroTable_huc): - htable_req_cols = [ - "HUC", - "branch_id", - "feature_id", - "HydroID", - "stage", - "discharge_cms", - "LakeID", - ] - hydroTable_all = pd.read_csv( - hydroTable_huc, - dtype={ - "HUC": str, - "branch_id": int, - "feature_id": str, - "HydroID": str, - "stage": float, - "discharge_cms": float, - "LakeID": int, - }, - usecols=htable_req_cols, - ) - hydroTable_all.set_index(["HUC", "feature_id", "HydroID"], inplace=True) - hydroTable_branch = hydroTable_all.loc[hydroTable_all["branch_id"] == int(branch_id)] + + if hydro_table_df is not None: + hydro_table_all = hydro_table_df.set_index(["HUC", "feature_id", "HydroID"], inplace=False) + hydro_table_branch = hydro_table_all.loc[hydro_table_all["branch_id"] == int(branch_id)] else: - # Earlier FIM4 versions only have branch level hydrotables - hydroTable_branch = os.path.join(branch_dir, f"hydroTable_{branch_id}.csv") + hydro_table_huc = os.path.join(huc_dir, "hydrotable.csv") + if os.path.isfile(hydro_table_huc): + + htable_req_cols = [ + "HUC", + "branch_id", + "feature_id", + "HydroID", + "stage", + "discharge_cms", + "LakeID", + ] + + hydro_table_all = pd.read_csv( + hydro_table_huc, + dtype={ + "HUC": str, + "branch_id": int, + "feature_id": str, + "HydroID": str, + "stage": float, + "discharge_cms": float, + "LakeID": int, + }, + usecols=htable_req_cols, + ) + + hydro_table_all.set_index(["HUC", "feature_id", "HydroID"], inplace=True) + hydro_table_branch = hydro_table_all.loc[hydro_table_all["branch_id"] == int(branch_id)] + else: + # Earlier FIM4 versions only have branch level hydrotables + hydro_table_branch = os.path.join(branch_dir, f"hydroTable_{branch_id}.csv") xwalked_file_name = f"gw_catchments_reaches_filtered_addedAttributes_crosswalked_{branch_id}.gpkg" catchment_poly = os.path.join(branch_dir, xwalked_file_name) @@ -237,7 +255,7 @@ def __inundate_gms_generator( "rem": rem_branch, "catchments": catchments_branch, "catchment_poly": catchment_poly, - "hydro_table": hydroTable_branch, + "hydro_table": hydro_table_branch, "forecast": forecast, "mask_type": "filter", "hucs": None, @@ -253,10 +271,11 @@ def __inundate_gms_generator( "quiet": not verbose, } - yield (inundate_input, identifiers) + yield inundate_input, identifiers if __name__ == "__main__": + # parse arguments parser = argparse.ArgumentParser(description="Inundate FIM") parser.add_argument( diff --git a/tools/inundate_gms_optimized.py b/tools/inundate_gms_optimized.py new file mode 100755 index 00000000..681ea50a --- /dev/null +++ b/tools/inundate_gms_optimized.py @@ -0,0 +1,320 @@ +#!/usr/bin/env python3 + +import argparse +import os +from concurrent.futures import ProcessPoolExecutor, as_completed + +import pandas as pd +from inundation_optimized import NoForecastFound, hydroTableHasOnlyLakes, inundate +from tqdm import tqdm + +from utils.shared_functions import FIM_Helpers as fh + + +def Inundate_gms( + hydrofabric_dir, + forecast, + num_workers=1, + hydro_table_df=None, + hucs=None, + inundation_raster=None, + inundation_polygon=None, + depths_raster=None, + verbose=False, + log_file=None, + output_fileNames=None, +): + # input handling + if hucs is not None: + try: + _ = (i for i in hucs) + except TypeError: + raise ValueError("hucs argument must be an iterable") + + if isinstance(hucs, str): + hucs = [hucs] + + num_workers = int(num_workers) + + # log file + if log_file is not None: + if os.path.exists(log_file): + os.remove(log_file) + + if verbose: + with open(log_file, 'a') as f: + f.write("HUC8,BranchID,Exception") + # if log_file: + # logging.basicConfig(filename=log_file, level=logging.INFO) + # logging.info('HUC8,BranchID,Exception') + + # load fim inputs + hucs_branches = pd.read_csv( + os.path.join(hydrofabric_dir, "fim_inputs.csv"), header=None, dtype={0: str, 1: str} + ) + + if hucs is not None: + hucs = set(hucs) + huc_indices = hucs_branches.loc[:, 0].isin(hucs) + hucs_branches = hucs_branches.loc[huc_indices, :] + + # get number of branches + number_of_branches = len(hucs_branches) + + # make inundate generator + inundate_input_generator = __inundate_gms_generator( + hucs_branches, + hydrofabric_dir, + inundation_raster, + inundation_polygon, + depths_raster, + forecast, + hydro_table_df, + verbose=False, + ) + + # start up process pool + # better results with Process pool + executor = ProcessPoolExecutor(max_workers=num_workers) + + # collect output filenames + inundation_raster_fileNames = [None] * number_of_branches + inundation_polygon_fileNames = [None] * number_of_branches + depths_raster_fileNames = [None] * number_of_branches + hucCodes = [None] * number_of_branches + branch_ids = [None] * number_of_branches + + executor_generator = {executor.submit(inundate, **inp): ids for inp, ids in inundate_input_generator} + idx = 0 + for future in tqdm( + as_completed(executor_generator), + total=len(executor_generator), + desc=f"Inundating branches with {num_workers} workers", + disable=(not verbose), + ): + hucCode, branch_id = executor_generator[future] + + try: + future.result() + + except NoForecastFound as exc: + if log_file is not None: + with open(log_file, 'a') as f: + f.write(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}") + elif verbose: + print(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}") + + except hydroTableHasOnlyLakes as exc: + if log_file is not None: + with open(log_file, 'a') as f: + f.write(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}") + elif verbose: + print(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}") + + except Exception as exc: + if log_file is not None: + with open(log_file, 'a') as f: + f.write(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}") + else: + print(f"{hucCode},{branch_id},{exc.__class__.__name__}, {exc}") + else: + hucCodes[idx] = hucCode + branch_ids[idx] = branch_id + + try: + # print(hucCode,branch_id,future.result()[0][0]) + inundation_raster_fileNames[idx] = future.result()[0][0] + except TypeError: + pass + + try: + depths_raster_fileNames[idx] = future.result()[1][0] + except TypeError: + pass + + try: + inundation_polygon_fileNames[idx] = future.result()[2][0] + except TypeError: + pass + + idx += 1 + + # power down pool + executor.shutdown(wait=True) + + # make filename dataframe + output_fileNames_df = pd.DataFrame( + { + "huc8": hucCodes, + "branchID": branch_ids, + "inundation_rasters": inundation_raster_fileNames, + "depths_rasters": depths_raster_fileNames, + "inundation_polygons": inundation_polygon_fileNames, + } + ) + + if output_fileNames is not None: + output_fileNames_df.to_csv(output_fileNames, index=False) + + return output_fileNames_df + + +def __inundate_gms_generator( + hucs_branches, + hydrofabric_dir, + inundation_raster, + inundation_polygon, + depths_raster, + forecast, + hydro_table_df=None, + verbose=False, +): + """ + Generator for use in parallelizing inundation + + Parameters + ---------- + + """ + # Iterate over branches + for idx, row in hucs_branches.iterrows(): + huc = str(row[0]) + branch_id = str(row[1]) + + huc_dir = os.path.join(hydrofabric_dir, huc) + branch_dir = os.path.join(huc_dir, "branches", branch_id) + + rem_file_name = f"rem_zeroed_masked_{branch_id}_int16.tif" + rem_branch = os.path.join(branch_dir, rem_file_name) + + catchments_file_name = f"gw_catchments_reaches_filtered_addedAttributes_{branch_id}_int16.tif" + catchments_branch = os.path.join(branch_dir, catchments_file_name) + + # FIM versions > 4.3.5 use an aggregated hydrotable file rather than individual branch hydrotables + + if hydro_table_df is not None: + hydro_table_all = hydro_table_df.set_index(["HUC", "feature_id", "HydroID"], inplace=False) + hydro_table_branch = hydro_table_all.loc[hydro_table_all["branch_id"] == int(branch_id)] + else: + hydro_table_huc = os.path.join(huc_dir, "hydrotable.csv") + if os.path.isfile(hydro_table_huc): + + htable_req_cols = [ + "HUC", + "branch_id", + "feature_id", + "HydroID", + "stage", + "discharge_cms", + "LakeID", + ] + + hydro_table_all = pd.read_csv( + hydro_table_huc, + dtype={ + "HUC": str, + "branch_id": int, + "feature_id": str, + "HydroID": str, + "stage": float, + "discharge_cms": float, + "LakeID": int, + }, + usecols=htable_req_cols, + ) + + hydro_table_all.set_index(["HUC", "feature_id", "HydroID"], inplace=True) + hydro_table_branch = hydro_table_all.loc[hydro_table_all["branch_id"] == int(branch_id)] + else: + # Earlier FIM4 versions only have branch level hydrotables + hydro_table_branch = os.path.join(branch_dir, f"hydroTable_{branch_id}.csv") + + xwalked_file_name = f"gw_catchments_reaches_filtered_addedAttributes_crosswalked_{branch_id}.gpkg" + catchment_poly = os.path.join(branch_dir, xwalked_file_name) + + # branch output + # Some other functions that call in here already added a huc, so only add it if not yet there + if (inundation_raster is not None) and (huc not in inundation_raster): + inundation_branch_raster = fh.append_id_to_file_name(inundation_raster, [huc, branch_id]) + else: + inundation_branch_raster = fh.append_id_to_file_name(inundation_raster, branch_id) + + if (depths_raster is not None) and (huc not in depths_raster): + depths_branch_raster = fh.append_id_to_file_name(depths_raster, [huc, branch_id]) + else: + depths_branch_raster = fh.append_id_to_file_name(depths_raster, branch_id) + + # identifiers + identifiers = (huc, branch_id) + + # print(f"inundation_branch_raster is {inundation_branch_raster}") + + # inundate input + inundate_input = { + "rem": rem_branch, + "catchments": catchments_branch, + "catchment_poly": catchment_poly, + "hydro_table": hydro_table_branch, + "forecast": forecast, + "mask_type": "filter", + "hucs": None, + "hucs_layerName": None, + "subset_hucs": None, + "num_workers": 1, + "aggregate": False, + "inundation_raster": inundation_branch_raster, + "depths": depths_branch_raster, + "quiet": not verbose, + } + + yield inundate_input, identifiers + + +if __name__ == "__main__": + + # parse arguments + parser = argparse.ArgumentParser(description="Inundate FIM") + parser.add_argument( + "-y", "--hydrofabric_dir", help="Directory path to FIM hydrofabric by processing unit", required=True + ) + parser.add_argument( + "-u", "--hucs", help="List of HUCS to run", required=False, default=None, type=str, nargs="+" + ) + parser.add_argument("-f", "--forecast", help="Forecast discharges in CMS as CSV file", required=True) + parser.add_argument( + "-i", + "--inundation-raster", + help="Inundation Raster output. Only writes if designated.", + required=False, + default=None, + ) + parser.add_argument( + "-p", + "--inundation-polygon", + help="Inundation polygon output. Only writes if designated.", + required=False, + default=None, + ) + parser.add_argument( + "-d", + "--depths-raster", + help="Depths raster output. Only writes if designated. Appends HUC code in batch mode.", + required=False, + default=None, + ) + parser.add_argument( + "-l", "--log-file", help="Log-file to store level-path exceptions", required=False, default=None + ) + parser.add_argument( + "-o", + "--output-fileNames", + help="Output CSV file with filenames for inundation rasters, inundation polygons, and depth rasters", + required=False, + default=None, + ) + parser.add_argument("-w", "--num-workers", help="Number of Workers", required=False, default=1) + parser.add_argument( + "-v", "--verbose", help="Verbose printing", required=False, default=None, action="store_true" + ) + + Inundate_gms(**vars(parser.parse_args())) diff --git a/tools/inundate_mosaic_wrapper.py b/tools/inundate_mosaic_wrapper.py index 2e68e9a6..84e36522 100644 --- a/tools/inundate_mosaic_wrapper.py +++ b/tools/inundate_mosaic_wrapper.py @@ -3,6 +3,7 @@ import os from timeit import default_timer as timer +import pandas as pd from inundate_gms import Inundate_gms from mosaic_inundation import Mosaic_inundation @@ -14,6 +15,7 @@ def produce_mosaicked_inundation( hydrofabric_dir, hucs, flow_file, + hydro_table_df=None, inundation_raster=None, inundation_polygon=None, depths_raster=None, @@ -81,7 +83,7 @@ def produce_mosaicked_inundation( ) # Check that flow file exists - if not os.path.exists(flow_file): + if not isinstance(flow_file, pd.DataFrame) and not os.path.exists(flow_file): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), flow_file) # Check job numbers and raise error if necessary @@ -97,6 +99,7 @@ def produce_mosaicked_inundation( map_file = Inundate_gms( hydrofabric_dir=hydrofabric_dir, forecast=flow_file, + hydro_table_df=hydro_table_df, num_workers=num_workers, hucs=hucs, inundation_raster=inundation_raster, @@ -135,6 +138,7 @@ def produce_mosaicked_inundation( verbose=verbose, is_mosaic_for_branches=is_mosaic_for_branches, inundation_polygon=inundation_polygon, + workers=num_workers, ) fh.vprint("Mosaicking complete.", verbose) diff --git a/tools/inundate_mosaic_wrapper_optimized.py b/tools/inundate_mosaic_wrapper_optimized.py new file mode 100644 index 00000000..964b978d --- /dev/null +++ b/tools/inundate_mosaic_wrapper_optimized.py @@ -0,0 +1,230 @@ +import argparse +import errno +import os +from timeit import default_timer as timer + +import pandas as pd +from inundate_gms_optimized import Inundate_gms +from mosaic_inundation import Mosaic_inundation + +from utils.shared_functions import FIM_Helpers as fh +from utils.shared_variables import elev_raster_ndv + + +def produce_mosaicked_inundation( + hydrofabric_dir, + hucs, + flow_file, + hydro_table_df=None, + inundation_raster=None, + inundation_polygon=None, + depths_raster=None, + map_filename=None, + mask=None, + unit_attribute_name="huc8", + num_workers=1, + remove_intermediate=True, + verbose=False, + is_mosaic_for_branches=False, +): + """ + This function calls Inundate_gms and Mosaic_inundation to produce inundation maps. + Possible outputs include inundation rasters encoded by HydroID (negative HydroID for dry and positive + HydroID for wet), polygons depicting extent, and depth rasters. The function requires a flow file + organized by NWM feature_id and discharge in cms. "feature_id" and "discharge" columns MUST be present + in the flow file. + + Args: + hydrofabric_dir (str): Path to hydrofabric directory where FIM outputs were written by + fim_pipeline. + huc (str): The HUC for which to produce mosaicked inundation files. + flow_file (str): Path to flow file to be used for inundation. + feature_ids in flow_file should be present in supplied HUC. + inundation_raster (str): Full path to output inundation raster + (encoded by positive and negative HydroIDs). + inuntation_polygon (str): Full path to output inundation polygon. Optional. + depths_raster (str): Full path to output depths_raster. Pixel values will be in meters. Optional. + num_workers (int): Number of parallel jobs to run. + keep_intermediate (bool): Option to keep intermediate files. + verbose (bool): Print verbose messages to screen. Not tested. + """ + + # Check that inundation_raster or depths_raster is supplied + if inundation_raster is None and depths_raster is None: + raise ValueError("Must supply either inundation_raster or depths_raster.") + + # Check that output directory exists. Notify user that output directory will be created if not. + for output_file in [inundation_raster, inundation_polygon, depths_raster]: + if output_file is None: + continue + parent_dir = os.path.split(output_file)[0] + if not os.path.exists(parent_dir): + fh.vprint( + "Parent directory for " + + os.path.split(output_file)[1] + + " does not exist. The parent directory will be produced.", + verbose, + ) + os.makedirs(parent_dir) + + # Check that hydrofabric_dir exists + if not os.path.exists(hydrofabric_dir): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), hydrofabric_dir) + + # If the "hucs" argument is really one huc, convert it to a list + if type(hucs) is str: + hucs = [hucs] + + # Check that huc folder exists in the hydrofabric_dir. + for huc in hucs: + if not os.path.exists(os.path.join(hydrofabric_dir, huc)): + raise FileNotFoundError( + (errno.ENOENT, os.strerror(errno.ENOENT), os.path.join(hydrofabric_dir, huc)) + ) + + # Check that flow file exists + if not isinstance(flow_file, pd.DataFrame) and not os.path.exists(flow_file): + raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), flow_file) + + # Check job numbers and raise error if necessary + total_cpus_available = os.cpu_count() - 1 + if num_workers > total_cpus_available: + raise ValueError( + "The number of workers (-w), {}, " + "exceeds your machine's available CPU count minus one ({}). " + "Please lower the num_workers.".format(num_workers, total_cpus_available) + ) + + # Call Inundate_gms + map_file = Inundate_gms( + hydrofabric_dir=hydrofabric_dir, + forecast=flow_file, + hydro_table_df=hydro_table_df, + num_workers=num_workers, + hucs=hucs, + inundation_raster=inundation_raster, + depths_raster=depths_raster, + verbose=verbose, + ) + + # Write map file if designated + if map_filename is not None: + if not os.path.isdir(os.path.dirname(map_filename)): + os.makedirs(os.path.dirname(map_filename)) + + map_file.to_csv(map_filename, index=False) + + fh.vprint("Mosaicking extent...", verbose) + + for mosaic_attribute in ["depths_rasters", "inundation_rasters"]: + mosaic_output = None + if mosaic_attribute == "inundation_rasters": + if inundation_raster is not None: + mosaic_output = inundation_raster + elif mosaic_attribute == "depths_rasters": + if depths_raster is not None: + mosaic_output = depths_raster + + if mosaic_output is not None: + # Call Mosaic_inundation + mosaic_file_path = Mosaic_inundation( + map_file.copy(), + mosaic_attribute=mosaic_attribute, + mosaic_output=mosaic_output, + mask=mask, + unit_attribute_name=unit_attribute_name, + nodata=elev_raster_ndv, + remove_inputs=remove_intermediate, + verbose=verbose, + is_mosaic_for_branches=is_mosaic_for_branches, + inundation_polygon=inundation_polygon, + workers=num_workers, + ) + + fh.vprint("Mosaicking complete.", verbose) + + return mosaic_file_path + + +if __name__ == "__main__": + # Parse arguments + parser = argparse.ArgumentParser( + description="Helpful utility to produce mosaicked inundation extents (raster and poly) and depths." + ) + parser.add_argument( + "-y", + "--hydrofabric_dir", + help="Directory path to FIM hydrofabric by processing unit.", + required=True, + type=str, + ) + parser.add_argument( + "-u", "--hucs", help="List of HUCS to run", required=True, default="", type=str, nargs="+" + ) + parser.add_argument( + "-f", + "--flow_file", + help='Discharges in CMS as CSV file. "feature_id" and "discharge" columns MUST be supplied.', + required=True, + type=str, + ) + parser.add_argument( + "-i", "--inundation-raster", help="Inundation raster output.", required=False, default=None, type=str + ) + parser.add_argument( + "-p", + "--inundation-polygon", + help="Inundation polygon output. Only writes if designated.", + required=False, + default=None, + type=str, + ) + parser.add_argument( + "-d", + "--depths-raster", + help="Depths raster output. Only writes if designated. Appends HUC code in batch mode.", + required=False, + default=None, + type=str, + ) + parser.add_argument( + "-m", + "--map-filename", + help="Path to write output map file CSV (optional). Default is None.", + required=False, + default=None, + type=str, + ) + parser.add_argument("-k", "--mask", help="Name of mask file.", required=False, default=None, type=str) + parser.add_argument( + "-a", + "--unit_attribute_name", + help='Name of attribute column in map_file. Default is "huc8".', + required=False, + default="huc8", + type=str, + ) + parser.add_argument("-w", "--num-workers", help="Number of workers.", required=False, default=1, type=int) + parser.add_argument( + "-r", + "--remove-intermediate", + help="Remove intermediate products, i.e. individual branch inundation.", + required=False, + default=False, + action="store_true", + ) + parser.add_argument( + "-v", + "--verbose", + help="Verbose printing. Not tested.", + required=False, + default=False, + action="store_true", + ) + + start = timer() + + # Extract to dictionary and run + produce_mosaicked_inundation(**vars(parser.parse_args())) + + print(f"Completed in {round((timer() - start)/60, 2)} minutes.") diff --git a/tools/inundation.py b/tools/inundation.py index 4dc7232c..754cb995 100755 --- a/tools/inundation.py +++ b/tools/inundation.py @@ -13,6 +13,7 @@ import rasterio import xarray as xr from numba import njit, typed, types +from rasterio.features import shapes from rasterio.io import DatasetReader, DatasetWriter from rasterio.mask import mask from shapely.geometry import shape @@ -221,31 +222,35 @@ def inundate( ) # start up thread pool - executor = ThreadPoolExecutor(max_workers=num_workers) + # executor = ThreadPoolExecutor(max_workers=num_workers) + + # start up thread pool + # executor = ThreadPoolExecutor(max_workers=num_workers) # submit jobs - results = {executor.submit(__inundate_in_huc, *wg): wg[6] for wg in window_gen} + # results = {executor.submit(__inundate_in_huc, *wg): wg[6] for wg in window_gen} inundation_rasters = [] depth_rasters = [] inundation_polys = [] - for future in as_completed(results): - try: - future.result() - except Exception as exc: - __vprint("Exception {} for {}".format(exc, results[future]), not quiet) - else: - if results[future] is not None: - __vprint("... {} complete".format(results[future]), not quiet) - else: - __vprint("... complete", not quiet) - - inundation_rasters += [future.result()[0]] - depth_rasters += [future.result()[1]] - inundation_polys += [future.result()[2]] + # for future in as_completed(results): + # try: + # future.result() + # except Exception as exc: + # __vprint("Exception {} for {}".format(exc, results[future]), not quiet) + # else: + # if results[future] is not None: + # __vprint("... {} complete".format(results[future]), not quiet) + # else: + # __vprint("... complete", not quiet) + for wg in window_gen: + future = __inundate_in_huc(*wg) + inundation_rasters += [future[0]] + depth_rasters += [future[1]] + inundation_polys += [future[2]] # power down pool - executor.shutdown(wait=True) + # executor.shutdown(wait=True) # close datasets rem.close() @@ -371,7 +376,7 @@ def __inundate_in_huc( # polygonize inundation if isinstance(inundation_polygon, fiona.Collection): # make generator for inundation polygons - # TODO shapes() method below is "undefined". + inundation_polygon_generator = shapes( inundation_array, mask=inundation_array > 0, connectivity=8, transform=window_transform ) @@ -428,13 +433,16 @@ def __inundate_in_huc( def __go_fast_mapping(rem, catchments, catchmentStagesDict, inundation, depths): for i, (r, cm) in enumerate(zip(rem, catchments)): if cm in catchmentStagesDict: - if r >= 0: + if r >= 0: # 0.027 meters representing an inch or more of inundation depth depth = catchmentStagesDict[cm] - r - depths[i] = max(depth, 0) # set negative depths to 0 + if np.round(depth * 1000) / 1000 < 0.03048: + depths[i] = 0 + else: + depths[i] = depth else: depths[i] = 0 - if depths[i] > 0: # set positive depths to positive + if depths[i] >= 0.03048: # set positive depths to positive inundation[i] *= -1 # else: # set positive depths to value of positive catchment value # inundation[i] = cm diff --git a/tools/inundation_optimized.py b/tools/inundation_optimized.py new file mode 100755 index 00000000..be45f054 --- /dev/null +++ b/tools/inundation_optimized.py @@ -0,0 +1,818 @@ +#!/usr/bin/env python3 + +import argparse +import gc +import time +from collections import OrderedDict +from concurrent.futures import ThreadPoolExecutor, as_completed +from os.path import splitext +from typing import Union +from warnings import warn + +import fiona +import geopandas as gpd +import numba.typed.typeddict +import numpy as np +import pandas as pd +import rasterio +import rioxarray as rxr +import xarray as xr +from gval.homogenize.spatial_alignment import _matching_spatial_indices +from numba import njit, prange, typed, types +from shapely.geometry import shape + + +gpd.options.io_engine = "pyogrio" + + +class hydroTableHasOnlyLakes(Exception): + """Raised when a Hydro-Table only has lakes""" + + pass + + +class NoForecastFound(Exception): + """Raised when no forecast is available for a given Hydro-Table""" + + pass + + +def inundate( + rem: Union[str, xr.DataArray, xr.Dataset], + catchments: Union[str, xr.DataArray, xr.Dataset], + catchment_poly, + hydro_table, + forecast, + mask_type, + hucs=None, + hucs_layerName=None, + subset_hucs=None, + num_workers=1, + aggregate=False, + inundation_raster=None, + depths=None, + src_table=None, + quiet: bool = False, +): + """ + + Run inundation on FIM >=3.0 outputs at job-level scale or aggregated scale + + Generate depths raster, inundation raster, and inundation polygon from FIM >=3.0 outputs. + Can use the FIM 3.0 outputs at native HUC level or the aggregated products. + Be sure to pass a HUCs file to process in batch mode if passing aggregated products. + + Parameters + ---------- + rem : str or rioxarray DataSet or DataArray + File path to or rasterio dataset reader of Relative Elevation Model raster. + Must have the same CRS as catchments raster. + catchments : str or rioxarray DataSet or DataArray + File path to or rasterio dataset reader of Catchments raster. Must have the same CRS as REM raster + hydro_table : str or pandas.DataFrame + File path to hydro-table csv or Pandas DataFrame object with correct indices and columns. + forecast : str or pandas.DataFrame + File path to forecast csv or Pandas DataFrame with correct column names. + hucs : str or fiona.Collection, optional + Batch mode only. File path or fiona collection of vector polygons in HUC 4,6,or 8's to inundate on. + Must have an attribute named as either "HUC4","HUC6", or "HUC8" with the associated values. + hucs_layerName : str, optional + Batch mode only. Layer name in hucs to use if multi-layer file is passed. + subset_hucs : str or list of str, optional + Batch mode only. File path to line delimited file, HUC string, or list of HUC strings to + further subset hucs file for inundating. + num_workers : int, optional + Batch mode only. Number of workers to use in batch mode. Must be 1 or greater. + aggregate : bool, optional + Batch mode only. Aggregates output rasters to VRT mosaic files and merges polygons to single GPKG file + Currently not functional. Raises warning and sets to false. On to-do list. + inundation_raster : str, optional + Path to optional inundation raster output. Appends HUC number if ran in batch mode. + inundation_polygon : str, optional + Path to optional inundation vector output. Only accepts GPKG right now. + Appends HUC number if ran in batch mode. + depths : str, optional + Path to optional depths raster output. Appends HUC number if ran in batch mode. + quiet : bool, optional + Quiet output. + + Returns + ------- + error_code : int + Zero for successful completion. + + Raises + ------ + TypeError + Wrong input data types + AssertionError + Wrong input data types + + Warns + ----- + warn + if aggregrate set to true, will revert to false. + + Notes + ----- + - Specifying a subset of the domain in rem or catchments to inundate on is achieved by the HUCs file or + the forecast file. + + """ + + rem, catchments, hucs, num_workers, quiet = __validation( + rem=rem, + catchments=catchments, + hydro_table=hydro_table, + hucs=hucs, + hucs_layerName=hucs_layerName, + num_workers=num_workers, + aggregate=aggregate, + quiet=quiet, + ) + + # catchment stages dictionary + if hydro_table is not None: + catchmentStagesDict, hucSet = __subset_hydroTable_to_forecast(hydro_table, forecast, subset_hucs) + else: + raise TypeError("Pass hydro table csv") + + if catchmentStagesDict is not None: + if src_table is not None: + create_src_subset_csv(hydro_table, catchmentStagesDict, src_table) + + # make windows generator + window_gen = __make_windows_generator( + rem, + catchments, + catchment_poly, + mask_type, + catchmentStagesDict, + inundation_raster, + depths, + quiet, + hucs=hucs, + hucSet=hucSet, + ) + + # start up thread pool + # executor = ThreadPoolExecutor(max_workers=num_workers) + # results = {executor.submit(__inundate_in_huc, *wg): wg[6] for wg in window_gen} + + inundation_rasters = [] + depth_rasters = [] + inundation_polys = [] + # for future in as_completed(results): + # try: + # future.result() + # except Exception as exc: + # __vprint("Exception {} for {}".format(exc, results[future]), not quiet) + # else: + # if results[future] is not None: + # __vprint("... {} complete".format(results[future]), not quiet) + # else: + # __vprint("... complete", not quiet) + + # Temprorarily incurring serial processing + for wg in window_gen: + future = __inundate_in_huc(*wg) + inundation_rasters += [future[0]] + depth_rasters += [future[1]] + inundation_polys += [future[2]] + + # power down pool + # executor.shutdown(wait=True) + + # close datasets + rem.close() + catchments.close() + del rem, catchments + gc.collect() + + return inundation_rasters, depth_rasters, inundation_polys + + +def __validation( + rem: Union[str, xr.DataArray, xr.Dataset], + catchments: Union[str, xr.DataArray, xr.Dataset], + hydro_table: Union[str, pd.DataFrame], + hucs: list = None, + hucs_layerName: str = None, + num_workers: int = 1, + aggregate: bool = False, + quiet: bool = False, +): + """ + Checking inputs for valid formats and values + + Parameters + ---------- + rem : Union[str, xr.DataArray, xr.Dataset] + File path to or rasterio dataset reader of Relative Elevation Model raster. + catchments : Union[str, xr.DataArray, xr.Dataset] + File path to or rasterio dataset reader of Catchments raster. + hydro_table : Union[str, pd.DataFrame] + File path to hydro-table csv or Pandas DataFrame object with correct indices and columns. + hucs : list, optional + Batch mode only. File path or fiona collection of vector polygons in HUC 4,6,or 8's to inundate on. + hucs_layerName : str, optional + Batch mode only. Layer name in hucs to use if multi-layer file is passed. + num_workers: int + Batch mode only. Number of workers to use in batch mode. Must be 1 or greater. + aggregate: bool, optional + Batch mode only. Aggregates output rasters to VRT mosaic files and merges polygons to single GPKG file + Currently not functional. Raises warning and sets to false. On to-do list. + quiet: bool, optional + Whether to supress printed output + + Returns + ------- + rem : Union[xr.Dataset, xr.DataArray] + Normalized elevation values for the catchment + catchments: Union[xr.Dataset, xr.DataArray] + Rasterized catchments with HydroIDs + hucs : Union[None, fiona.Collection] + Vector collection of hucs or None + num_workers : int + Number of parallel workers + quiet : bool + Whether to supress printed output + """ + # check for num_workers + num_workers = int(num_workers) + assert num_workers >= 1, "Number of workers should be 1 or greater" + if (num_workers > 1) & (hucs is None): + raise AssertionError("Pass a HUCs file to batch process inundation mapping") + + # check that aggregate is only done for hucs mode + aggregate = bool(aggregate) + if aggregate: + warn("Aggregate feature currently not working. Setting to false for now.") + aggregate = False + if hucs is None: + assert not aggregate, "Pass HUCs file if aggregation is desired" + + # bool quiet + quiet = bool(quiet) + + # input rem + if isinstance(rem, str): + rem = rxr.open_rasterio(rem) + elif isinstance(rem, xr.Dataset) or isinstance(rem, xr.DataArray): + pass + else: + raise TypeError("Pass rioxarray Dataset/DataArray or filepath for rem") + + # input catchments grid + if isinstance(catchments, str): + catchments = rxr.open_rasterio(catchments) + elif isinstance(catchments, xr.Dataset) or isinstance(catchments, xr.DataArray): + pass + else: + raise TypeError("Pass rioxarray Dataset/DataArray or filepath for catchments") + + # check for matching number of bands and single band only + assert _matching_spatial_indices( + rem, catchments + ), "REM and catchments rasters are required to be single band only" + + # open hucs + if hucs is None: + pass + elif isinstance(hucs, str): + hucs = fiona.open(hucs, 'r', layer=hucs_layerName) + elif isinstance(hucs, fiona.Collection): + pass + else: + raise TypeError("Pass fiona collection or filepath for hucs") + + # catchment stages dictionary + if hydro_table is None: + raise TypeError("Pass hydro table csv") + + return rem, catchments, hucs, num_workers, quiet + + +def __inundate_in_huc( + rem: Union[str, xr.DataArray, xr.Dataset], + catchments: Union[str, xr.DataArray, xr.Dataset], + hucCode: str, + catchmentStagesDict: numba.typed.typeddict, + depths: str, + inundation_raster: str, + quiet: bool, +): + """ + Inundate within the chosen scope + + Parameters + ---------- + rem : Union[str, xr.DataArray, xr.Dataset] + File path to or rasterio dataset reader of Relative Elevation Model raster. + catchments : Union[str, xr.DataArray, xr.Dataset] + File path to or rasterio dataset reader of Catchments raster. + hucCode : str + Catchment processing unit to inundate + catchmentStagesDict : numba dictionary + Numba compatible dictionary with HydroID as a key and flood stage as a value + depths : str + Name of inundation depth dataset + inundation_raster : str + Name of inundation extent dataset + quiet : bool + Whether to supress printed output + + """ + # verbose print + if hucCode is not None: + __vprint("Inundating {} ...".format(hucCode), not quiet) + + # reset output values + nodata_r = np.int16(rem.rio.nodata) + nodata_c = np.int16(catchments.rio.nodata) + + # make output arrays + __go_fast_mapping( + rem.data, + catchments.data, + catchmentStagesDict, + rem.data.shape[2], + rem.data.shape[1], + nodata_r, + nodata_c, + ) + + raster_profile = { + "driver": 'GTiff', + "blockxsize": 256, + "blockysize": 256, + "tiled": True, + "compress": 'lzw', + "transform": rem.rio.transform(), + "height": rem.data.shape[1], + "width": rem.data.shape[2], + "count": 1, + "dtype": np.int16, + } + + if depths is not None: + rem.rio.to_raster(depths, **raster_profile) + + if inundation_raster is not None: + catchments.rio.to_raster(inundation_raster, **raster_profile) + + return inundation_raster, depths, None + + +@njit(nogil=True, fastmath=True, parallel=True, cache=True) +def __go_fast_mapping(rem, catchments, catchmentStagesDict, x, y, nodata_r, nodata_c): + """ + Numba optimization for determining flood depth and flood + + Parameters + ---------- + rem : numpy array + Relative elevation model values which will be replaced by inundation depth values + catchments : numpy array + Rasterized catchments represented by HydoIDs to be replaced with inundation values + catchmentStagesDict : numba dictionary + Numba compatible dictionary with HydroID as a key and flood stage as a value + x : int + Shape of longitude coordinates + y : int + Shape of latitude coordinates + nodata_r : int + Nodata value to use for depth values + nodata_c : int + Nodata value to use for catchment values + + """ + # Iterate through each latitude and longitude + for i in prange(y): + for j in prange(x): + # If catchments are nodata + if catchments[0, i, j] != nodata_c: + # catchments in stage dict + if catchments[0, i, j] in catchmentStagesDict: + # if elevation is zero or greater + if rem[0, i, j] >= 0: + depth = catchmentStagesDict[catchments[0, i, j]] - rem[0, i, j] + + # If the depth is greater than approximately 1/10th of a foot + if depth < 30: + catchments[0, i, j] *= -1 # set HydroIDs to negative + rem[0, i, j] = 0 + else: + rem[0, i, j] = depth + else: + rem[0, i, j] = 0 + catchments[0, i, j] *= -1 # set HydroIDs to negative + else: + rem[0, i, j] = 0 + catchments[0, i, j] = nodata_c + else: + rem[0, i, j] = 0 + catchments[0, i, j] = nodata_c + + +def __make_windows_generator( + rem, + catchments, + catchment_poly, + mask_type, + catchmentStagesDict, + inundation_raster, + depths, + quiet, + hucs=None, + hucSet=None, +): + """ + Generator to split processing in to windows or different masked datasets + + Parameters + ---------- + rem : rioxarray DataArray + Relative elevation model raster dataset + catchments : rioxarray DataArray + Rasterized catchments represented by HydoIDs + catchmentStagesDict : numba dictionary + Numba compatible dictionary with HydroID as a key and flood stage as a value + inundation_raster : str + Name of inundation extent raster to output + depths : str + Name of inundation depth raster to output + quiet : bool + Whether to suppress printed output or run in verbose mode + hucs : list, optional + HUC values to process + hucSet : list, optional + Prefixes of HUC to look for and process + + Returns + ------- + Tuple of rioxarray Datasets/DataArrays and other data + rem : rioxarray DataArray + Either full or masked dataset + catchments : rioxarray DataArray + Either full or masked dataset + hucCode : str + Code representing the huc processing unit + catchmentStagesDict : numba dictionary + Numba compatible dictionary with HydroID as a key and flood stage as a value + depths : str + Name of inundation depth raster to output + inundation_raster, : str + Name of inundation extent raster to output + quiet: bool + Whether to suppress printed output or run in verbose mode + + """ + + if hucs is not None: + # get attribute name for HUC column + for huc in hucs: + for hucColName in huc['properties'].keys(): + if 'HUC' in hucColName: + # hucSize = int(hucColName[-1]) + break + break + + # make windows + for huc in hucs: + # returns hucCode if current huc is in hucSet (at least starts with) + def __return_huc_in_hucSet(hucCode, hucSet): + for hs in hucSet: + if hs.startswith(hucCode): + return hucCode + + return None + + if __return_huc_in_hucSet(huc['properties'][hucColName], hucSet) is None: + continue + + try: + if mask_type == "huc": + # window = geometry_window(rem,shape(huc['geometry'])) + rem_masked = rem.rio.clip([shape(huc['geometry'])], from_disk=True) + catchments_masked = catchments.rio.clip([shape(huc['geometry'])], from_disk=True) + elif mask_type == "filter": + + if isinstance(catchment_poly, str): + catchment_poly = gpd.read_file(catchment_poly) + elif isinstance(catchment_poly, gpd.GeoDataFrame): + pass + elif isinstance(catchment_poly, None): + pass + else: + raise TypeError("Pass geopandas dataset or filepath for catchment polygons") + + fossid = huc['properties']['fossid'] + if catchment_poly.HydroID.dtype != 'str': + catchment_poly.HydroID = catchment_poly.HydroID.astype(str) + catchment_poly = catchment_poly[catchment_poly.HydroID.str.startswith(fossid)] + + rem_masked = rem.rio.clip([shape(huc['geometry'])], from_disk=True) + catchments_masked = catchments.rio.clip([shape(huc['geometry'])], from_disk=True) + # del catchment_poly + elif mask_type is None: + pass + else: + print("invalid mask type. Options are 'huc' or 'filter'") + except ValueError: # shape doesn't overlap raster + continue # skip to next HUC + + hucCode = huc['properties'][hucColName] + + yield ( + rem_masked, + catchments_masked, + hucCode, + catchmentStagesDict, + depths, + inundation_raster, + quiet, + ) + + else: + hucCode = None + + yield rem, catchments, hucCode, catchmentStagesDict, depths, inundation_raster, quiet + + +def __append_huc_code_to_file_name(fileName, hucCode): + if hucCode is None: + return fileName + + base_file_path, extension = splitext(fileName) + + return "{}_{}{}".format(base_file_path, hucCode, extension) + + +def __subset_hydroTable_to_forecast(hydroTable, forecast, subset_hucs=None): + if isinstance(hydroTable, str): + htable_req_cols = ['HUC', 'feature_id', 'HydroID', 'stage', 'discharge_cms', 'LakeID'] + hydroTable = pd.read_csv( + hydroTable, + dtype={ + 'HUC': str, + 'feature_id': str, + 'HydroID': str, + 'stage': float, + 'discharge_cms': float, + 'LakeID': int, + 'last_updated': object, + 'submitter': object, + 'obs_source': object, + }, + low_memory=False, + usecols=htable_req_cols, + ) + # huc_error = hydroTable.HUC.unique() + hydroTable = hydroTable.set_index(['HUC', 'feature_id', 'HydroID']) + + elif isinstance(hydroTable, pd.DataFrame): + pass # consider checking for correct dtypes, indices, and columns + else: + raise TypeError("Pass path to hydro-table csv or Pandas DataFrame") + + hydroTable = hydroTable[ + hydroTable["LakeID"] == -999 + ] # Subset hydroTable to include only non-lake catchments. + + # raises error if hydroTable is empty due to all segments being lakes + if hydroTable.empty: + raise hydroTableHasOnlyLakes("All stream segments in HUC are within lake boundaries.") + + if isinstance(forecast, str): + try: + forecast = pd.read_csv(forecast, dtype={'feature_id': str, 'discharge': float}) + forecast = forecast.set_index('feature_id') + except UnicodeDecodeError: + forecast = read_nwm_forecast_file(forecast) + + elif isinstance(forecast, pd.DataFrame): + pass # consider checking for dtypes, indices, and columns + else: + raise TypeError("Pass path to forecast file csv or Pandas DataFrame") + + if not hydroTable.empty: + if isinstance(forecast, str): + forecast = pd.read_csv(forecast, dtype={'feature_id': str, 'discharge': float}) + forecast = forecast.set_index('feature_id') + elif isinstance(forecast, pd.DataFrame): + pass # consider checking for dtypes, indices, and columns + else: + raise TypeError("Pass path to forecast file csv or Pandas DataFrame") + + # susbset hucs if passed + if subset_hucs is not None: + if isinstance(subset_hucs, list): + if len(subset_hucs) == 1: + try: + subset_hucs = open(subset_hucs[0]).read().split('\n') + except FileNotFoundError: + pass + elif isinstance(subset_hucs, str): + try: + subset_hucs = open(subset_hucs).read().split('\n') + except FileNotFoundError: + subset_hucs = [subset_hucs] + + # subsets HUCS + subset_hucs_orig = subset_hucs.copy() + subset_hucs = [] + for huc in np.unique(hydroTable.index.get_level_values('HUC')): + for sh in subset_hucs_orig: + if huc.startswith(sh): + subset_hucs += [huc] + + hydroTable = hydroTable[np.in1d(hydroTable.index.get_level_values('HUC'), subset_hucs)] + + # join tables + try: + hydroTable = hydroTable.join(forecast, on=['feature_id'], how='inner') + except AttributeError: + # print("FORECAST ERROR") + raise NoForecastFound("No forecast value found for the passed feature_ids in the Hydro-Table") + + else: + # initialize dictionary + catchmentStagesDict = typed.Dict.empty(types.int16, types.uint16) + + # interpolate stages + for hid, sub_table in hydroTable.groupby(level='HydroID'): + interpolated_stage = np.interp( + sub_table.loc[:, 'discharge'].unique(), + sub_table.loc[:, 'discharge_cms'], + sub_table.loc[:, 'stage'], + ) + + # add this interpolated stage to catchment stages dict + h = round(interpolated_stage[0], 4) + + hid = types.int16(np.int16(str(hid)[4:])) + h = types.int16(np.round(h * 1000)) + catchmentStagesDict[hid] = h + + # huc set + hucSet = [str(i) for i in hydroTable.index.get_level_values('HUC').unique().to_list()] + + return catchmentStagesDict, hucSet + + +def read_nwm_forecast_file(forecast_file, rename_headers=True): + """Reads NWM netcdf comp files and converts to forecast data frame""" + + flows_nc = xr.open_dataset(forecast_file, decode_cf='feature_id', engine='netcdf4') + + flows_df = flows_nc.to_dataframe() + flows_df = flows_df.reset_index() + + flows_df = flows_df[['streamflow', 'feature_id']] + + if rename_headers: + flows_df = flows_df.rename(columns={"streamflow": "discharge"}) + + convert_dict = {'feature_id': str, 'discharge': float} + flows_df = flows_df.astype(convert_dict) + + flows_df = flows_df.set_index('feature_id', drop=True) + + flows_df = flows_df.dropna() + + return flows_df + + +def __vprint(message, verbose): + + if verbose: + print(message) + + +def create_src_subset_csv(hydro_table, catchmentStagesDict, src_table): + src_df = pd.DataFrame.from_dict(catchmentStagesDict, orient='index') + src_df = src_df.reset_index() + src_df.columns = ['HydroID', 'stage_inund'] + htable_req_cols = ['HUC', 'feature_id', 'HydroID', 'stage', 'discharge_cms', 'LakeID'] + df_htable = pd.read_csv( + hydro_table, + dtype={ + 'HydroID': int, + 'HUC': object, + 'branch_id': int, + 'last_updated': object, + 'submitter': object, + 'obs_source': object, + }, + usecols=htable_req_cols, + ) + df_htable = df_htable.merge(src_df, how='left', on='HydroID') + df_htable['find_match'] = (df_htable['stage'] - df_htable['stage_inund']).abs() + df_htable = df_htable.loc[df_htable.groupby('HydroID')['find_match'].idxmin()].reset_index(drop=True) + df_htable.to_csv(src_table, index=False) + + +if __name__ == '__main__': + # parse arguments + parser = argparse.ArgumentParser( + description='Rapid inundation mapping for FOSS FIM. Operates in single-HUC and batch modes.' + ) + parser.add_argument( + '-r', '--rem', help='REM raster at job level or mosaic vrt. Must match catchments CRS.', required=True + ) + parser.add_argument( + '-c', + '--catchments', + help='Catchments raster at job level or mosaic VRT. Must match rem CRS.', + required=True, + ) + parser.add_argument('-b', '--catchment-poly', help='catchment_vector', required=True) + parser.add_argument('-t', '--hydro-table', help='Hydro-table in csv file format', required=True) + parser.add_argument('-f', '--forecast', help='Forecast discharges in CMS as CSV file', required=True) + parser.add_argument( + '-u', + '--hucs', + help='Batch mode only: HUCs file to process at. Must match CRS of input rasters', + required=False, + default=None, + ) + parser.add_argument( + '-l', + '--hucs-layerName', + help='Batch mode only. Layer name in HUCs file to use', + required=False, + default=None, + ) + parser.add_argument( + '-j', + '--num-workers', + help='Batch mode only. Number of concurrent processes', + required=False, + default=1, + type=int, + ) + parser.add_argument( + '-s', + '--subset-hucs', + help="""Batch mode only. HUC code, + series of HUC codes (no quotes required), or line delimited of HUCs to run within + the hucs file that is passed""", + required=False, + default=None, + nargs='+', + ) + parser.add_argument( + '-m', + '--mask-type', + help='Specify huc (FIM < 3) or filter (FIM >= 3) masking method', + required=False, + default="huc", + ) + parser.add_argument( + '-a', + '--aggregate', + help="""Batch mode only. Aggregate outputs to VRT files. + Currently, raises warning and sets to false if used.""", + required=False, + action='store_true', + ) + parser.add_argument( + '-i', + '--inundation-raster', + help="""Inundation Raster output. Only writes if designated. + Appends HUC code in batch mode.""", + required=False, + default=None, + ) + parser.add_argument( + '-p', + '--inundation-polygon', + help="""Inundation polygon output. Only writes if designated. + Appends HUC code in batch mode.""", + required=False, + default=None, + ) + parser.add_argument( + '-d', + '--depths', + help="""Depths raster output. Only writes if designated. + Appends HUC code in batch mode.""", + required=False, + default=None, + ) + parser.add_argument( + '-n', + '--src-table', + help="""Output table with the SRC lookup/interpolation. + Only writes if designated. Appends HUC code in batch mode.""", + required=False, + default=None, + ) + parser.add_argument( + '-q', '--quiet', help='Quiet terminal output', required=False, default=False, action='store_true' + ) + + # extract to dictionary + args = vars(parser.parse_args()) + # feature_id = 5253867 diff --git a/tools/mosaic_inundation.py b/tools/mosaic_inundation.py index bc20265c..dc64b201 100755 --- a/tools/mosaic_inundation.py +++ b/tools/mosaic_inundation.py @@ -86,7 +86,7 @@ def Mosaic_inundation( inundation_maps_list, ag_mosaic_output, nodata, - workers=1, + workers=workers, remove_inputs=remove_inputs, mask=mask, verbose=verbose, @@ -133,7 +133,8 @@ def mosaic_by_unit( else: threaded = False - overlap.merge_rasters(mosaic_output, threaded=threaded, workers=1, nodata=nodata) + print('mosaic workers', workers, threaded) + overlap.merge_rasters(mosaic_output, threaded=threaded, workers=workers, nodata=nodata) if mask: fh.vprint("Masking ...", verbose) diff --git a/tools/overlapping_inundation.py b/tools/overlapping_inundation.py index fbb1bcc5..ff5a2e5a 100644 --- a/tools/overlapping_inundation.py +++ b/tools/overlapping_inundation.py @@ -3,6 +3,7 @@ import concurrent.futures import warnings +from concurrent.futures import ThreadPoolExecutor, as_completed from functools import partial from threading import Lock @@ -75,6 +76,11 @@ def key_sort_func(x): self.partitions = num_partitions self.window_sizes = window_xy_size + @staticmethod + def __vprint(message, verbose): + if verbose: + print(message) + @staticmethod @njit def get_res_bbox_min(x, v, z, y): @@ -141,10 +147,10 @@ def get_window_coords(self): # Get window sizes (both normal and edge windows) window_bounds1 = np.flip( np.array(np.meshgrid(window_width1, window_height1)).T.reshape(-1, 2), axis=1 - ).astype(np.int64) + ).astype(int) window_bounds2 = np.flip( np.array(np.meshgrid(window_width2, window_height2)).T.reshape(-1, 2), axis=1 - ).astype(np.int64) + ).astype(int) window_idx = np.array(np.unravel_index(np.arange(y_res * x_res), (y_res, x_res), order="F")) @@ -224,7 +230,7 @@ def read_rst_data(self, win_idx, datasets, path_points, bbox, meta): window = path_points[win_idx] window_height, window_width = np.array( [np.abs(bbox[win_idx][2] - bbox[win_idx][0]), np.abs(bbox[win_idx][3] - bbox[win_idx][1])] - ).astype(np.int64) + ).astype(int) bnds = [] data = [] @@ -254,7 +260,7 @@ def read_rst_data(self, win_idx, datasets, path_points, bbox, meta): return [final_bnds, bnds, data] - def merge_rasters(self, out_fname, nodata=-9999, threaded=False, workers=4): + def merge_rasters(self, out_fname, nodata=-9999, threaded=False, workers=4, quiet=True): """ Merge multiple raster datasets @@ -294,12 +300,10 @@ def merge_rasters(self, out_fname, nodata=-9999, threaded=False, workers=4): compress="lzw", ) - final_windows, data_windows, data = [], [], [] - def __data_generator(data_dict, path_points, bbox, meta): for key, val in data_dict.items(): f_window, window, dat = self.read_rst_data(key, val, path_points, bbox, meta) - yield (dat, window, f_window, val) + yield dat, window, f_window, val # final_windows.append(f_window) # data_windows.append(window) # data.append(dat) @@ -329,8 +333,20 @@ def __data_generator(data_dict, path_points, bbox, meta): for d, dw, fw, ddict in dgen: merge_partial(d, dw, fw, ddict) else: - with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: - executor.map(merge_partial, data, data_windows, final_windows, data_dict.values()) + # start up thread pool + executor = ThreadPoolExecutor(max_workers=workers) + results = {executor.submit(merge_partial, *wg): 1 for wg in dgen} + + for future in as_completed(results): + try: + future.result() + except Exception as exc: + self.__vprint("Exception {} for {}".format(exc, results[future]), not quiet) + else: + if results[future] is not None: + self.__vprint("... {} complete".format(results[future]), not quiet) + else: + self.__vprint("... complete", not quiet) def mask_mosaic(self, mosaic, polys, polys_layer=None, outfile=None): # rem_array,window_transform = mask(rem,[shape(huc['geometry'])],crop=True,indexes=1) diff --git a/tools/tools_shared_functions.py b/tools/tools_shared_functions.py index 2c9da736..e67ccf1f 100755 --- a/tools/tools_shared_functions.py +++ b/tools/tools_shared_functions.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 + import datetime as dt +import gc import json import logging import os @@ -19,13 +21,12 @@ import urllib3 import xarray as xr from dotenv import load_dotenv -from geocube.api.core import make_geocube from gval import CatStats from rasterio import features from rasterio.warp import Resampling, calculate_default_transform, reproject from requests.adapters import HTTPAdapter -from requests.packages.urllib3.exceptions import InsecureRequestWarning from shapely.geometry import MultiPolygon, Polygon, shape +from urllib3.exceptions import InsecureRequestWarning from urllib3.util.retry import Retry @@ -436,6 +437,7 @@ def get_stats_table_from_binary_rasters( # Continue to next layer if features are absent. if poly_all.empty: del poly_all + gc.collect() continue # Project layer to reference crs. @@ -450,16 +452,21 @@ def get_stats_table_from_binary_rasters( all_masks_df = poly_all_proj del poly_all, poly_all_proj + gc.collect() stats_table_dictionary = {} # Initialize empty dictionary. c_aligned, b_aligned = candidate_raster.gval.homogenize(benchmark_raster, target_map="candidate") + del candidate_raster, benchmark_raster + gc.collect() agreement_map = c_aligned.gval.compute_agreement_map( b_aligned, comparison_function='pairing_dict', pairing_dict=pairing_dictionary ) + del c_aligned, b_aligned + gc.collect() agreement_map_og = agreement_map.copy() agreement_map.rio.write_nodata(4, inplace=True) @@ -482,8 +489,10 @@ def get_stats_table_from_binary_rasters( # Only write the agreement raster if user-specified. if agreement_raster != None: agreement_map_write = agreement_map.rio.write_nodata(10, encoded=True) - agreement_map_write.rio.to_raster(agreement_raster, dtype=np.int32, driver="COG") + agreement_map_write.rio.to_raster(agreement_raster, dtype=np.uint8, driver="COG") + del agreement_map_write + gc.collect() # Write legend text file legend_txt = os.path.join(os.path.split(agreement_raster)[0], 'read_me.txt') @@ -512,6 +521,7 @@ def get_stats_table_from_binary_rasters( ) del crosstab_table, metrics_table + gc.collect() # After agreement_array is masked with default mask layers, check for inclusion masks in mask_dict. if mask_dict != {}: @@ -529,6 +539,7 @@ def get_stats_table_from_binary_rasters( # Continue to next layer if features are absent. if poly_all.empty: del poly_all + gc.collect() continue poly_all_proj = poly_all.to_crs(agreement_map.rio.crs) @@ -561,8 +572,9 @@ def get_stats_table_from_binary_rasters( os.path.split(agreement_raster)[0], poly_handle + '_agreement.tif' ) agreement_map_write = agreement_map_include.rio.write_nodata(10, encoded=True) - agreement_map_write.rio.to_raster(layer_agreement_raster, dtype=np.int32, driver="COG") + agreement_map_write.rio.to_raster(layer_agreement_raster, dtype=np.uint8, driver="COG") del agreement_map_write + gc.collect() # Update stats table dictionary stats_table_dictionary.update( @@ -574,11 +586,11 @@ def get_stats_table_from_binary_rasters( ) } ) - del agreement_map_include - - del poly_all, poly_all_proj, metrics_table, crosstab_table + del agreement_map_include, poly_all, poly_all_proj, metrics_table, crosstab_table + gc.collect() del agreement_map + gc.collect() return stats_table_dictionary @@ -1615,7 +1627,7 @@ def calculate_metrics_from_agreement_raster(agreement_raster): for idx, wind in agreement_raster.block_windows(1): window_data = agreement_raster.read(1, window=wind) values, counts = np.unique(window_data, return_counts=True) - for val, cts in values_counts: + for val, cts in zip(values, counts): totals[val] += cts results = dict()