diff --git a/chicago/permit_cleaning.py b/chicago/permit_cleaning.py index 8cc62fe..290344c 100644 --- a/chicago/permit_cleaning.py +++ b/chicago/permit_cleaning.py @@ -8,9 +8,9 @@ in separate Excel workbooks in separate folders. Data that need review are split into two categories and corresponding folders/files: those with quick fixes for fields over character or amount limits, and those with more complicated fixes for missing and/or invalid fields. -The following environment variables need to be set before running this script: - AWS_ATHENA_S3_STAGING_DIR=s3://ccao-athena-results-us-east-1 - AWS_REGION=us-east-1 +The following optional environment variables can be set: + AWS_ATHENA_S3_STAGING_DIR: S3 path where Athena query results are stored + AWS_REGION: Region that AWS operations should be run in The script also expects two positional arguments: * start_date: The lower bound date to use for filtering permits @@ -21,6 +21,7 @@ """ from datetime import datetime +import decimal import math import os import sys @@ -241,21 +242,67 @@ def flag_fix_long_fields(df): return df -def deduplicate_permits(cursor, df): - cursor.execute(""" - SELECT - parid, - permdt, - amount, - note2, - user21, - user28, - user43 - FROM iasworld.permit - """) +def deduplicate_permits(cursor, df, start_date, end_date): + cursor.execute( + """ + SELECT + iasw_id, + parid, + permdt, + amount, + note2, + user21, + user28, + user43 + FROM iasworld.permit + WHERE permdt BETWEEN %(start_date)s AND %(end_date)s + """, + {"start_date": start_date, "end_date": end_date}, + ) existing_permits = as_pandas(cursor) - # TODO: Antijoin df to existing_permits - return df + workbook_to_iasworld_col_map = { + "PIN* [PARID]": "parid", + "Issue Date* [PERMDT]": "permdt", + "Amount* [AMOUNT]": "amount", + "Applicant Street Address* [ADDR1]": "note2", + "Applicant* [USER21]": "user21", + "Local Permit No.* [USER28]": "user28", + "Notes [NOTE1]": "user43", + } + new_permits = df.copy() + for workbook_key, iasworld_key in workbook_to_iasworld_col_map.items(): + new_permits[iasworld_key] = new_permits[workbook_key] + + # Transform new columns to ensure they match the iasworld formatting + new_permits["amount"] = new_permits["amount"].apply( + lambda x: decimal.Decimal("{:.2f}".format(x)) + ) + new_permits["permdt"] = new_permits["permdt"].apply( + lambda x: datetime.strptime(x, "%m/%d/%Y").strftime( + "%Y-%m-%d %H:%M:%S.%f" + )[:-5] + ) + new_permits["note2"] = new_permits["note2"] + ",,CHICAGO, IL" + new_permits["user43"] = new_permits["user43"].str.replace( + "(", "" + ).replace(")", "") + new_permits["user43"] = new_permits["user43"].str.slice(0, 261) + + # Antijoin new_permits to existing_permits to find permits that do + # not exist in iasworld + merged_permits = pd.merge( + new_permits, + existing_permits, + how="left", + on=list(workbook_to_iasworld_col_map.values()), + indicator=True + ) + true_new_permits = merged_permits[merged_permits["_merge"] == "left_only"] + true_new_permits = true_new_permits.drop("_merge", axis=1) + for iasworld_key in workbook_to_iasworld_col_map.values(): + true_new_permits = true_new_permits.drop(iasworld_key, axis=1) + + return true_new_permits def gen_file_base_name(): @@ -321,8 +368,14 @@ def save_xlsx_files(df, max_rows, file_base_name): if __name__ == "__main__": conn = connect( - s3_staging_dir=os.getenv("AWS_ATHENA_S3_STAGING_DIR"), - region_name=os.getenv("AWS_REGION"), + s3_staging_dir=os.getenv( + "AWS_ATHENA_S3_STAGING_DIR", + "s3://ccao-athena-results-us-east-1", + ), + region_name=os.getenv( + "AWS_REGION", + "us-east-1", + ), ) cursor = conn.cursor() @@ -357,7 +410,12 @@ def save_xlsx_files(df, max_rows, file_base_name): "Number of permits prior to deduplication: " f"{len(permits_shortened)}" ) - permits_deduped = deduplicate_permits(cursor, permits_shortened) + permits_deduped = deduplicate_permits( + cursor, + permits_shortened, + start_date, + end_date + ) print( "Number of permits after deduplication: " f"{len(permits_deduped)}"