Skip to content

Commit

Permalink
Merge pull request #702 from ccao-data/ensure-bad-column-match-errors
Browse files Browse the repository at this point in the history
Ensure API upload script throws error/warning if Athena and Socrata columns mismatch
  • Loading branch information
wrridgeway authored Jan 10, 2025
2 parents 607e38a + 77a63ad commit f37dd20
Showing 1 changed file with 33 additions and 1 deletion.
34 changes: 33 additions & 1 deletion socrata/socrata_upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import contextlib
import io
import json
import logging
import os
import time

Expand All @@ -10,6 +11,11 @@
from pyathena import connect
from pyathena.pandas.cursor import PandasCursor

logger = logging.getLogger(__name__)

# Allow python to print full length dataframes for logging
pd.set_option("display.max_rows", None)

# Create a session object so HTTP requests can be pooled
session = requests.Session()
session.auth = (
Expand Down Expand Up @@ -93,8 +99,10 @@ def build_query(

# Retrieve column names and types from Athena
columns = cursor.execute("show columns from " + athena_asset).as_pandas()
athena_columns = columns["column"].tolist()
athena_columns.sort()

# Limit pull to columns present in open data asset - shouldn't change anything, but prevents failure if columns have become misaligned.
# Retrieve column names from Socrata
asset_columns = (
session.get(
f"https://datacatalog.cookcountyil.gov/resource/{asset_id}"
Expand All @@ -105,6 +113,30 @@ def build_query(
.strip("]")
.split(",")
)
asset_columns.sort()

# If there are columns on Socrata that are not in Athena, abort upload and
# inform user of discrepancies. The script should not run at all in this
# circumstance since it will update some but not all columns in the open
# data asset.
# If there are columns in Athena but not on Socrata, it may be the case that
# they should be added, but there are also cases when not all columns for an
# Athena view that feeds an open data asset need to be part of that asset.
if athena_columns != asset_columns:
columns_not_on_socrata = set(athena_columns) - set(asset_columns)
columns_not_in_athena = set(asset_columns) - set(athena_columns)
exception_message = (
f"Columns on Socrata and in Athena do not match for {athena_asset}"
)

if len(columns_not_on_socrata) > 0:
exception_message += f"\nColumns in Athena but not on Socrata: {columns_not_on_socrata}"
logger.warning(exception_message)
if len(columns_not_in_athena) > 0:
exception_message += f"\nColumns on Socrata but not in Athena: {columns_not_in_athena}"
raise Exception(exception_message)

# Limit pull to columns present in open data asset
columns = columns[columns["column"].isin(asset_columns)]

# Array type columns are not compatible with the json format needed for
Expand Down

0 comments on commit f37dd20

Please sign in to comment.