Skip to content

Commit

Permalink
Added code to ingest model sources for new stations
Browse files Browse the repository at this point in the history
  • Loading branch information
jmpmcmanus committed Jul 18, 2024
1 parent 32600be commit 9b08750
Show file tree
Hide file tree
Showing 3 changed files with 298 additions and 24 deletions.
236 changes: 236 additions & 0 deletions run/createIngestNewModelSourceMeta.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#!/usr/bin/env python
# coding: utf-8

# Import python modules
import argparse
import psycopg
import sys
import os
import glob
import pandas as pd
from loguru import logger

def getSourceModelMetaLocationType(inputLocationType):
''' Returns DataFrame containing source meta-data queried from the drf_source_obs_meta table.
Parameters
inputLocationType: string
gauge location type (COASTAL, TIDAL, or RIVERS)
Returns
DataFrame
'''

try:
# Create connection to database and get cursor
conn = psycopg.connect(dbname=os.environ['APSVIZ_GAUGES_DB_DATABASE'],
user=os.environ['APSVIZ_GAUGES_DB_USERNAME'],
host=os.environ['APSVIZ_GAUGES_DB_HOST'],
port=os.environ['APSVIZ_GAUGES_DB_PORT'],
password=os.environ['APSVIZ_GAUGES_DB_PASSWORD'])
cur = conn.cursor()

# Run query
cur.execute("""SELECT data_source, source_name, source_archive, source_variable, source_instance, forcing_metclass, filename_prefix, location_type, units
FROM drf_source_model_meta WHERE location_type = %(locationtype)s
ORDER BY filename_prefix""",
{'locationtype': inputLocationType})

# convert query output to Pandas dataframe
df = pd.DataFrame(cur.fetchall(), columns=['data_source', 'source_name', 'source_archive', 'source_variable', 'source_instance', 'forcing_metclass',
'filename_prefix', 'location_type', 'units'])

# Close cursor and database connection
cur.close()
conn.close()

# return DataFrame
return(df)

# If exception log error
except (Exception, psycopg.DatabaseError) as error:
logger.exception(error)

def getStationID(stationNameList):
''' Returns a DataFrame containing a list of station ids and station names, based on the location type (COASTAL, TIDAL or RIVERS),
from table drf_gauge_station.
Parameters
stationNameList: list
a list of station_names
Returns
DataFrame
'''

try:
# Create connection to database and get cursor
conn = psycopg.connect(dbname=os.environ['APSVIZ_GAUGES_DB_DATABASE'],
user=os.environ['APSVIZ_GAUGES_DB_USERNAME'],
host=os.environ['APSVIZ_GAUGES_DB_HOST'],
port=os.environ['APSVIZ_GAUGES_DB_PORT'],
password=os.environ['APSVIZ_GAUGES_DB_PASSWORD'])
cur = conn.cursor()

# Run query
cur.execute("""SELECT station_id, station_name FROM drf_gauge_station
WHERE station_name = ANY(%(stationnamelist)s)
ORDER BY station_name""",
{'stationnamelist': stationNameList})

# convert query output to Pandas dataframe
df = pd.DataFrame(cur.fetchall(), columns=['station_id', 'station_name'])

# Close cursor and database connection
cur.close()
conn.close()

# Return Pandas dataframe
return(df)

# If exception log error
except (Exception, psycopg.DatabaseError) as error:
logger.exception(error)

def addMeta(ingestPath, inputFilePath, inputDataSource, inputSourceName, inputSourceArchive, inputSourceInstance, inputForcingMetclass, inputUnits, inputLocationType):
''' Returns a CSV file that containes source information specific to station IDs that have been extracted from the drf_gauge_station table.
The function adds additional source information (data source, source name, source archive, data units) to the station IDs. This
information is latter ingested into table drf_model_source by running the ingestModelSourceData() function in ingetTask.py
Parameters
ingestPath: string
Directory path to ingest data files, created from the harvest files, modelRunID subdirectory is included in this path
inputFilePath: string
The geom file path and name
inputDataSource: string
Unique identifier of data source (e.g., river_gauge, tidal_predictions, air_barameter, wind_anemometer, NAMFORECAST_NCSC_SAB_V1.23...)
inputSourceName: string
Organization that owns original source data (e.g., ncem, ndbc, noaa, adcirc...)
inputSourceArchive: string
Where the original data source is archived (e.g., contrails, ndbc, noaa, renci...)
inputSourceInstance: string
Source instance, such as ncsc123_gfs_sb55.01. Used by ingestSourceMeta, and ingestData.
inputForcingMetclass: string
ADCIRC model forcing class, such as synoptic or tropical. Used by ingestSourceMeta, and ingestData.
inputUnits: string
Units of data (e.g., m (meters), m^3ps (meter cubed per second), mps (meters per second), and mb (millibars)
inputLocationType: string
gauge location type (COASTAL, TIDAL, or RIVERS)
Returns
CSV file
'''

# Add station csv file as input and change this to read station csv file
# Create list of geom files, to be ingested by searching the input directory for geom files.
df = pd.read_csv(inputFilePath, usecols=[0], names=['station_name'])
stationNameList = [format(x, 'd') for x in list(df['station_name'].values)]
df = getStationID(stationNameList)

df['data_source'] = inputDataSource
df['source_name'] = inputSourceName
df['source_archive'] = inputSourceArchive
df['source_instance'] = inputSourceInstance
df['forcing_metclass'] = inputForcingMetclass
df['units'] = inputUnits

# Drop station_name from DataFrame
df.drop(columns=['station_name'], inplace=True)

# Reorder column name and update indeces
newColsOrder = ['station_id','data_source','source_name','source_archive','source_instance','forcing_metclass','units']
df=df.reindex(columns=newColsOrder)

# Write dataframe to csv file
outputFile = 'source_'+inputSourceName+'_stationdata_'+inputSourceArchive+'_'+inputLocationType+'_'+inputDataSource+'_meta.csv'
df.to_csv(ingestPath+outputFile, index=False, header=False)

def runIngestModelSourceData(ingestDir, inputLocationType):
''' This function runs createIngestObsSourceMeta.py which creates source data files that are then ingested into the drf_gauge_source
table, in the database, by running ingestObsTasks.py using --inputTask ingestObsSourceData.
Parameters
ingestDir: string
Directory path to the ast-run-ingester directory
inputLocationType: string
Gauge location type (COASTAL, TIDAL, or RIVERS)
Returns
None, but it runs createIngestObsSourceMeta.py, which creates CSV files containing source meta-data, and then runs
ingestObsTasks.py which ingest the CSV file into the drf_gauge_source table.
'''

# get source meta
df = getSourceModelMetaLocationType(inputLocationType)

# get geom file
inputFile = glob.glob(ingestDir+"stations/geom_*.csv")

# run addmMeta for the sources from getSourceObsMetaLocationType
for index, row in df.iterrows():
# ingestPath, inputFilePath, inputDataSource, inputSourceName, inputSourceArchive, inputSourceInstance, inputForcingMetclass, inputUnits, inputLocationType
addMeta(ingestDir, inputFile[0], row['data_source'], row['source_name'], row['source_archive'], row['source_instance'], row['forcing_metclass'],
row['units'], row['location_type'])

# remove geom file
os.remove(inputFile[0])

# Create list of program commands
program_list = []
program_list.append(['python','ingestTasks.py','--ingestDir',ingestDir,'--inputTask','ingestModelSourceData'])

# Run list of program commands using subprocess
for program in program_list:
logger.info('Run '+" ".join(program))
output = subprocess.run(program, shell=False, check=True)
logger.info('Ran '+" ".join(program)+" with output returncode "+str(output.returncode))

# Main program function takes args as input, which contains the ingestPath, and outputFile values.
@logger.catch
def main(args):
''' Main program function takes args as input, starts logger, runs addMeta(), which writes output to CSV file.
The CSV file will be ingest into table drf_gauge_source when ingestObsSourceData() function is run in ingestObsTask.py
Parameters
args: dictionary
contains the parameters listed below
ingestDir: string
Directory path to the ast-run-ingester directory
inputLocationType: string
gauge location type (COASTAL, TIDAL, or RIVERS)
Returns
CSV file
'''

# Add logger

logger.remove()
log_path = os.path.join(os.getenv('LOG_PATH', os.path.join(os.path.dirname(__file__), 'logs')), '')
logger.add(log_path+'createIngestNewModelSourceMeta.log', level='DEBUG')
logger.add(sys.stdout, level="DEBUG")
logger.add(sys.stderr, level="ERROR")

# Extract args variables
ingestDir = os.path.join(args.ingestDir, '')
inputLocationType = args.inputLocationType

logger.info('Start processing source data for location type '+inputLocationType+'.')

# Run addMeta function
runIngestModelSourceData(ingestDir, inputLocationType)
logger.info('Finished processing location type '+inputLocationType+'.')

# Run main function takes ingestPath, and outputFile as input.
if __name__ == "__main__":
''' Takes argparse inputs and passes theme to the main function
Parameters
ingestDir: string
Directory path to the ast-run-ingester directory
inputLocationType: string
Gauge location type (COASTAL, TIDAL, or RIVERS)
Returns
None
'''

parser = argparse.ArgumentParser()

# Optional argument which requires a parameter (eg. -d test)
parser.add_argument("--ingestDIR", "--ingestDir", help="Output directory path", action="store", dest="ingestDir", required=True)
parser.add_argument("--inputLocationType", help="Input location type", action="store", dest="inputLocationType", required=True)

# Parse input arguments
args = parser.parse_args()

# Run main
main(args)
2 changes: 1 addition & 1 deletion run/createIngestNewObsSourceMeta.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def runIngestObsSourceData(ingestDir, inputLocationType):

# Create list of program commands
program_list = []
program_list.append(['python','ingestTasks.py','--ingestDir',ingestDir,'--inputTask','ingestSourceData'])
program_list.append(['python','ingestTasks.py','--ingestDir',ingestDir,'--inputTask','ingestObsSourceData'])

# Run list of program commands using subprocess
for program in program_list:
Expand Down
84 changes: 61 additions & 23 deletions run/ingestTasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def ingestStations(ingestDir):
except (Exception, psycopg.DatabaseError) as error:
logger.exception(error)

def ingestSourceData(ingestDir):
def ingestObsSourceData(ingestDir):
''' This function takes as input an ingest directory. It uses the input directory to search for source CSV files, that where
created by the createIngestObsSourceMeta.py program. It uses the ingest directory to define the path of the file that is to
be ingested. The ingest directory is the directory path in the apsviz-timeseriesdb database container.
Expand Down Expand Up @@ -109,6 +109,50 @@ def ingestSourceData(ingestDir):
except (Exception, psycopg.DatabaseError) as error:
logger.exception(error)

def ingestModelSourceData(ingestDir):
''' This function takes as input an ingest directory. It uses the input directory to search for source CSV files, that where
created by the createIngestModelSourceMeta.py program. It uses the ingest directory to define the path of the file that is to
be ingested. The ingest directory is the directory path in the apsviz-timeseriesdb database container.
Parameters
ingestDir: string
Directory path to ingest data files, created from the harvest files.
Returns
None
'''

# Create list of source files, to be ingested by searching the input directory for source files.
inputFiles = glob.glob(ingestDir+"source_*.csv")

try:
# Create connection to database, set autocommit, and get cursor
with psycopg.connect(dbname=os.environ['APSVIZ_GAUGES_DB_DATABASE'],
user=os.environ['APSVIZ_GAUGES_DB_USERNAME'],
host=os.environ['APSVIZ_GAUGES_DB_HOST'],
port=os.environ['APSVIZ_GAUGES_DB_PORT'],
password=os.environ['APSVIZ_GAUGES_DB_PASSWORD'],
autocommit=True) as conn:
cur = conn.cursor()

# Loop thru source file list, ingesting each one
for sourceFile in inputFiles:
# Run ingest query
with open(sourceFile, "r") as f:
with cur.copy("COPY drf_model_source (station_id,data_source,source_name,source_archive,source_instance,forcing_metclass,units) FROM STDIN WITH (FORMAT CSV)") as copy:
while data := f.read(100):
copy.write(data)

# Remove source data file after ingesting it.
logger.info('Remove source data file: '+sourceFile+' after ingesting it')
os.remove(sourceFile)

# Close cursor and database connection
cur.close()
conn.close()

# If exception log error
except (Exception, psycopg.DatabaseError) as error:
logger.exception(error)

# Main program function takes args as input, which contains the inputDir, inputTask, inputDataSource, inputSourceName, and inputSourceArchive values.
@logger.catch
def main(args):
Expand All @@ -117,11 +161,10 @@ def main(args):
args: dictionary
contains the parameters listed below.
inputTask: string
The type of task (ingestSourceMeta, ingestStations, ingestSourceData, ingestHarvestDataFileMeta, ingestData, createObsView. createModelView )
The type of task (ingestStations, ingestObsSourceData, ingestModelSourceData )
to be perfomed. The type of inputTask can change what other types of inputs ingestTask.py requires. Below is a list of all inputs, with associated tasks.
ingestDir: string
Directory path to ingest data files, created from the harvest files. Used by ingestStations, ingestSourceData,
ingestHarvestDataFileMeta, and ingestData.
Directory path to ingest data files, created from the harvest files. Used by ingestStations, ingestObsSourceData, and ingestModelSourceData.
Returns
None
'''
Expand All @@ -141,23 +184,25 @@ def main(args):
logger.info('Ingesting station data.')
ingestStations(ingestDir)
logger.info('Ingested station data.')
elif inputTask.lower() == 'ingestsourcedata':
elif inputTask.lower() == 'ingestobssourcedata':
ingestDir = os.path.join(args.ingestDir, '')
logger.info('Ingesting obs source data.')
ingestObsSourceData(ingestDir)
logger.info('ingested obs source data.')
elif inputTask.lower() == 'ingestmodelsourcedata':
ingestDir = os.path.join(args.ingestDir, '')
logger.info('Ingesting source data.')
ingestSourceData(ingestDir)
logger.info('ingested source data.')
logger.info('Ingesting model source data.')
ingestModelSourceData(ingestDir)
logger.info('ingested model source data.')

# Run main function takes inputDir, inputTask, inputDataSource, inputSourceName, and inputSourceArchive as input.
if __name__ == "__main__":
''' Takes argparse inputs and passes theme to the main function
Parameters
inputTask: string
The type of task (ingestSourceMeta, ingestStations, ingestSourceData, ingestHarvestDataFileMeta, ingestData,
createObsView, createModelView ) to be perfomed. The type of inputTask can change what other types of inputs
ingestTask.py requires. Below is a list of all inputs, with associated tasks.
The type of task (ingestStations, ingestObsSourceData, ingestModelSourceData
ingestDir: string
Directory path to ingest data files, created from the harvest files. Used by ingestStations, ingestSourceData,
ingestHarvestDataFileMeta, and ingestData.
Directory path to ingest data files, created from the harvest files. Used by ingestStations, ingestObsSourceData, and ingestModelSourceData.
Returns
None
'''
Expand All @@ -166,18 +211,11 @@ def main(args):
parser = argparse.ArgumentParser()

# Optional argument which requires a parameter (eg. -d test)
parser.add_argument("--inputTask", help="Input task to be done", action="store", dest="inputTask", choices=['ingestStations','ingestSourceData'], required=True)

# get runScript argument to use in if statement
args = parser.parse_known_args()[0]
if args.inputTask.lower() == 'ingeststations':
parser.add_argument("--ingestDIR", "--ingestDir", help="Ingest directory path", action="store", dest="ingestDir", required=True)
elif args.inputTask.lower() == 'ingestsourcedata':
parser.add_argument("--ingestDIR", "--ingestDir", help="Ingest directory path", action="store", dest="ingestDir", required=True)
parser.add_argument("--inputTask", help="Input task to be done", action="store", dest="inputTask", choices=['ingestStations','ingestObsSourceData','ingestModelSourceData'], required=True)
parser.add_argument("--ingestDIR", "--ingestDir", help="Ingest directory path", action="store", dest="ingestDir", required=True)

# Parse arguments
args = parser.parse_args()

# Run main
main(args)

main(args)

0 comments on commit 9b08750

Please sign in to comment.