diff --git a/.github/workflows/extract-chicago-permits.yaml b/.github/workflows/extract-chicago-permits.yaml index e2415b9..dd53ebe 100644 --- a/.github/workflows/extract-chicago-permits.yaml +++ b/.github/workflows/extract-chicago-permits.yaml @@ -68,7 +68,11 @@ jobs: aws-region: us-east-1 - name: Extract permits - run: pipenv run python3 permit_cleaning.py + run: | + pipenv run python3 permit_cleaning.py \ + '${{ inputs.start_date }}' \ + '${{ inputs.end_date }}' \ + '${{ inputs.deduplicate }}' shell: bash working-directory: ${{ env.WORKING_DIR }} env: diff --git a/chicago/permit_cleaning.py b/chicago/permit_cleaning.py index d3a6698..a1c3f81 100644 --- a/chicago/permit_cleaning.py +++ b/chicago/permit_cleaning.py @@ -8,35 +8,37 @@ in separate Excel workbooks in separate folders. Data that need review are split into two categories and corresponding folders/files: those with quick fixes for fields over character or amount limits, and those with more complicated fixes for missing and/or invalid fields. -The following environment variables need to be set before running this script: - AWS_ATHENA_S3_STAGING_DIR=s3://ccao-athena-results-us-east-1 - AWS_REGION=us-east-1 +The following optional environment variables can be set: + AWS_ATHENA_S3_STAGING_DIR: S3 path where Athena query results are stored + AWS_REGION: Region that AWS operations should be run in + +The script also expects three positional arguments: + * start_date (str, YYYY-MM-DD): The lower bound date to use for filtering permits + * end_date (str, YYYY-MM-DD): The upper bound date to use for filtering + * deduplicate (bool): Whether to filter out permits that already exist in iasworld The following will also need to be updated: - At the beginning of each year: update year to current year in SQL_QUERY inside pull_existing_pins_from_athena() function - - Update limit as desired in the url in the download_all_permits() function (currently set at 5000 rows for testing purposes) """ -import requests -import pandas as pd +from datetime import datetime +import decimal +import math import os +import sys + import numpy as np -import math +import pandas as pd from pyathena import connect from pyathena.pandas.util import as_pandas -from datetime import datetime +import requests # DEFINE FUNCTIONS # Connect to Athena and download existing 14-digit PINs in Chicago -def pull_existing_pins_from_athena(): +def pull_existing_pins_from_athena(cursor): print("Pulling PINs from Athena") - conn = connect( - s3_staging_dir=os.getenv("AWS_ATHENA_S3_STAGING_DIR"), - region_name=os.getenv("AWS_REGION"), - ) - SQL_QUERY = """ SELECT CAST(pin AS varchar) AS pin, @@ -44,8 +46,6 @@ def pull_existing_pins_from_athena(): FROM default.vw_pin_universe WHERE triad_name = 'City' AND year = '2023' """ - - cursor = conn.cursor() cursor.execute(SQL_QUERY) chicago_pin_universe = as_pandas(cursor) chicago_pin_universe.to_csv("chicago_pin_universe.csv", index=False) @@ -53,10 +53,14 @@ def pull_existing_pins_from_athena(): return chicago_pin_universe -def download_all_permits(): - # update limit in url below when ready to work with full dataset (as of Dec 7, 2023 dataset has 757,677 rows) - url = "https://data.cityofchicago.org/resource/ydr8-5enu.json?$limit=5000&$order=issue_date DESC" - permits_response = requests.get(url) +def download_permits(start_date, end_date): + params = { + "$where": f"issue_date between '{start_date}' and '{end_date}'", + "$order": "issue_date DESC", + "$limit": 1000000 # Artificial limit to override the default + } + url = "https://data.cityofchicago.org/resource/ydr8-5enu.json" + permits_response = requests.get(url, params=params) permits_response.raise_for_status() permits = permits_response.json() permits_df = pd.DataFrame(permits) @@ -89,9 +93,17 @@ def expand_multi_pin_permits(df): # update pin to match formatting of iasWorld def format_pin(df): # iasWorld format doesn't include dashes - df["pin_final"] = df["solo_pin"].astype(str).str.replace("-", "") + df["pin_final"] = df["solo_pin"].astype("string").str.replace("-", "") # add zeros to 10-digit PINs to transform into 14-digits PINs - df["pin_final"] = df["pin_final"].apply(lambda x: x + "0000" if len(x) == 10 else x if x != "nan" else "") + def pad_pin(pin): + if not pd.isna(pin): + if len(pin) == 10: + return pin + "0000" + else: + return pin + else: + return "" + df["pin_final"] = df["pin_final"].apply(pad_pin) df["NOTE: 0000 added to PIN?"] = df["pin_final"].apply(lambda x: "Yes" if len(x) == 10 else "No") return df @@ -101,7 +113,7 @@ def organize_columns(df): address_columns = ["street_number", "street_direction", "street_name", "suffix"] df[address_columns] = df[address_columns].fillna("") - df["Address"] = df[address_columns].astype(str).agg(" ".join, axis=1) + df["Address"] = df[address_columns].astype("string").agg(" ".join, axis=1) df["issue_date"] = pd.to_datetime(df["issue_date"], format="%Y-%m-%dT%H:%M:%S.%f", errors='coerce').dt.strftime("%-m/%-d/%Y") @@ -154,12 +166,12 @@ def flag_invalid_pins(df, valid_pins): df["FLAG, INVALID: PIN* [PARID]"] = np.where(df["PIN* [PARID]"] == "", 0, ~df["PIN* [PARID]"].isin(valid_pins["pin"])) # also check if 10-digit PINs are valid to narrow down on problematic portion of invalid PINs - df["pin_10digit"] = df["PIN* [PARID]"].astype(str).str[:10] + df["pin_10digit"] = df["PIN* [PARID]"].astype("string").str[:10] df["FLAG, INVALID: pin_10digit"] = np.where(df["pin_10digit"] == "", 0, ~df["pin_10digit"].isin(valid_pins["pin10"])) # create variable that is the numbers following the 10-digit PIN # (not pulling last 4 digits from the end in case there are PINs that are not 14-digits in Chicago permit data) - df["pin_suffix"] = df["PIN* [PARID]"].astype(str).str[10:] + df["pin_suffix"] = df["PIN* [PARID]"].astype("string").str[10:] # comment for rows with invalid pin df["FLAG COMMENTS"] += df["FLAG, INVALID: PIN* [PARID]"].apply(lambda val: "" if val == 0 else "PIN* [PARID] is invalid, see Original PIN for raw form; ") @@ -239,6 +251,67 @@ def flag_fix_long_fields(df): return df +def deduplicate_permits(cursor, df, start_date, end_date): + cursor.execute( + """ + SELECT + parid, + permdt, + amount, + note2, + user21, + user28, + user43 + FROM iasworld.permit + WHERE permdt BETWEEN %(start_date)s AND %(end_date)s + """, + {"start_date": start_date, "end_date": end_date}, + ) + existing_permits = as_pandas(cursor) + workbook_to_iasworld_col_map = { + "PIN* [PARID]": "parid", + "Issue Date* [PERMDT]": "permdt", + "Amount* [AMOUNT]": "amount", + "Applicant Street Address* [ADDR1]": "note2", + "Applicant* [USER21]": "user21", + "Local Permit No.* [USER28]": "user28", + "Notes [NOTE1]": "user43", + } + new_permits = df.copy() + for workbook_key, iasworld_key in workbook_to_iasworld_col_map.items(): + new_permits[iasworld_key] = new_permits[workbook_key] + + # Transform new columns to ensure they match the iasworld formatting + new_permits["amount"] = new_permits["amount"].apply( + lambda x: decimal.Decimal("{:.2f}".format(x)) + ) + new_permits["permdt"] = new_permits["permdt"].apply( + lambda x: datetime.strptime(x, "%m/%d/%Y").strftime( + "%Y-%m-%d %H:%M:%S.%f" + )[:-5] + ) + new_permits["note2"] = new_permits["note2"] + ",,CHICAGO, IL" + new_permits["user43"] = new_permits["user43"].str.replace( + "(", "" + ).replace(")", "") + new_permits["user43"] = new_permits["user43"].str.slice(0, 261) + + # Antijoin new_permits to existing_permits to find permits that do + # not exist in iasworld + merged_permits = pd.merge( + new_permits, + existing_permits, + how="left", + on=list(workbook_to_iasworld_col_map.values()), + indicator=True + ) + true_new_permits = merged_permits[merged_permits["_merge"] == "left_only"] + true_new_permits = true_new_permits.drop("_merge", axis=1) + for iasworld_key in workbook_to_iasworld_col_map.values(): + true_new_permits = true_new_permits.drop(iasworld_key, axis=1) + + return true_new_permits + def gen_file_base_name(): today = datetime.today().date() @@ -301,28 +374,66 @@ def save_xlsx_files(df, max_rows, file_base_name): df_review_empty_invalid.to_excel(file_name_review_empty_invalid, index=False, engine="xlsxwriter") +if __name__ == "__main__": + conn = connect( + s3_staging_dir=os.getenv( + "AWS_ATHENA_S3_STAGING_DIR", + "s3://ccao-athena-results-us-east-1", + ), + region_name=os.getenv( + "AWS_REGION", + "us-east-1", + ), + ) + cursor = conn.cursor() + if os.path.exists("chicago_pin_universe.csv"): + print("Loading Chicago PIN universe data from csv.") + chicago_pin_universe = pd.read_csv( + "chicago_pin_universe.csv", + dtype={"pin": "string", "pin10": "string"} + ) + else: + chicago_pin_universe = pull_existing_pins_from_athena(cursor) + + start_date, end_date, deduplicate = sys.argv[1], sys.argv[2], sys.argv[3] + deduplicate = deduplicate.lower() == "true" + + permits = download_permits(start_date, end_date) + print( + f"Downloaded {len(permits)} " + f"permit{'' if len(permits) == 1 else 's'} " + f"between {start_date} and {end_date}" + ) + permits_expanded = expand_multi_pin_permits(permits) -# CALL FUNCTIONS -if os.path.exists("chicago_pin_universe.csv"): - print("Loading Chicago PIN universe data from csv.") - chicago_pin_universe = pd.read_csv("chicago_pin_universe.csv") -else: - chicago_pin_universe = pull_existing_pins_from_athena() - -permits = download_all_permits() - -permits_expanded = expand_multi_pin_permits(permits) + permits_pin = format_pin(permits_expanded) -permits_pin = format_pin(permits_expanded) + permits_renamed = organize_columns(permits_expanded) -permits_renamed = organize_columns(permits_expanded) + permits_validated = flag_invalid_pins(permits_renamed, chicago_pin_universe) -permits_validated = flag_invalid_pins(permits_renamed, chicago_pin_universe) + permits_shortened = flag_fix_long_fields(permits_validated) -permits_shortened = flag_fix_long_fields(permits_validated) + if deduplicate: + print( + "Number of permits prior to deduplication: " + f"{len(permits_shortened)}" + ) + permits_deduped = deduplicate_permits( + cursor, + permits_shortened, + start_date, + end_date + ) + print( + "Number of permits after deduplication: " + f"{len(permits_deduped)}" + ) + else: + permits_deduped = permits_shortened -file_base_name = gen_file_base_name() + file_base_name = gen_file_base_name() -save_xlsx_files(permits_shortened, 200, file_base_name) + save_xlsx_files(permits_deduped, 200, file_base_name)