diff --git a/socrata/socrata_upload.py b/socrata/socrata_upload.py index 11e022bce..94da825ba 100644 --- a/socrata/socrata_upload.py +++ b/socrata/socrata_upload.py @@ -1,6 +1,7 @@ import contextlib import io import json +import logging import os import time @@ -10,6 +11,11 @@ from pyathena import connect from pyathena.pandas.cursor import PandasCursor +logger = logging.getLogger(__name__) + +# Allow python to print full length dataframes for logging +pd.set_option("display.max_rows", None) + # Create a session object so HTTP requests can be pooled session = requests.Session() session.auth = ( @@ -93,8 +99,10 @@ def build_query( # Retrieve column names and types from Athena columns = cursor.execute("show columns from " + athena_asset).as_pandas() + athena_columns = columns["column"].tolist() + athena_columns.sort() - # Limit pull to columns present in open data asset - shouldn't change anything, but prevents failure if columns have become misaligned. + # Retrieve column names from Socrata asset_columns = ( session.get( f"https://datacatalog.cookcountyil.gov/resource/{asset_id}" @@ -105,6 +113,30 @@ def build_query( .strip("]") .split(",") ) + asset_columns.sort() + + # If there are columns on Socrata that are not in Athena, abort upload and + # inform user of discrepancies. The script should not run at all in this + # circumstance since it will update some but not all columns in the open + # data asset. + # If there are columns in Athena but not on Socrata, it may be the case that + # they should be added, but there are also cases when not all columns for an + # Athena view that feeds an open data asset need to be part of that asset. + if athena_columns != asset_columns: + columns_not_on_socrata = set(athena_columns) - set(asset_columns) + columns_not_in_athena = set(asset_columns) - set(athena_columns) + exception_message = ( + f"Columns on Socrata and in Athena do not match for {athena_asset}" + ) + + if len(columns_not_on_socrata) > 0: + exception_message += f"\nColumns in Athena but not on Socrata: {columns_not_on_socrata}" + logger.warning(exception_message) + if len(columns_not_in_athena) > 0: + exception_message += f"\nColumns on Socrata but not in Athena: {columns_not_in_athena}" + raise Exception(exception_message) + + # Limit pull to columns present in open data asset columns = columns[columns["column"].isin(asset_columns)] # Array type columns are not compatible with the json format needed for