Skip to content

Commit

Permalink
Update chicago extraction script and workflow to enable date filterin…
Browse files Browse the repository at this point in the history
…g and deduplication (#14)

* Update extract-chicago-permits to use start_date and end_date inputs

* Sketch out incorporation of deduplicate flag

* Flesh out deduplicate flag

* Clean up docstring in chicago/permit_cleaning.py

* Fix string type comparisons
  • Loading branch information
jeancochrane authored Jan 16, 2024
1 parent f53dfa2 commit 1551920
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 42 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/extract-chicago-permits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ jobs:
aws-region: us-east-1

- name: Extract permits
run: pipenv run python3 permit_cleaning.py
run: |
pipenv run python3 permit_cleaning.py \
'${{ inputs.start_date }}' \
'${{ inputs.end_date }}' \
'${{ inputs.deduplicate }}'
shell: bash
working-directory: ${{ env.WORKING_DIR }}
env:
Expand Down
193 changes: 152 additions & 41 deletions chicago/permit_cleaning.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,59 @@
in separate Excel workbooks in separate folders. Data that need review are split into two categories and corresponding folders/files:
those with quick fixes for fields over character or amount limits, and those with more complicated fixes for missing and/or invalid fields.
The following environment variables need to be set before running this script:
AWS_ATHENA_S3_STAGING_DIR=s3://ccao-athena-results-us-east-1
AWS_REGION=us-east-1
The following optional environment variables can be set:
AWS_ATHENA_S3_STAGING_DIR: S3 path where Athena query results are stored
AWS_REGION: Region that AWS operations should be run in
The script also expects three positional arguments:
* start_date (str, YYYY-MM-DD): The lower bound date to use for filtering permits
* end_date (str, YYYY-MM-DD): The upper bound date to use for filtering
* deduplicate (bool): Whether to filter out permits that already exist in iasworld
The following will also need to be updated:
- At the beginning of each year: update year to current year in SQL_QUERY inside pull_existing_pins_from_athena() function
- Update limit as desired in the url in the download_all_permits() function (currently set at 5000 rows for testing purposes)
"""

import requests
import pandas as pd
from datetime import datetime
import decimal
import math
import os
import sys

import numpy as np
import math
import pandas as pd
from pyathena import connect
from pyathena.pandas.util import as_pandas
from datetime import datetime
import requests


# DEFINE FUNCTIONS

# Connect to Athena and download existing 14-digit PINs in Chicago
def pull_existing_pins_from_athena():
def pull_existing_pins_from_athena(cursor):
print("Pulling PINs from Athena")
conn = connect(
s3_staging_dir=os.getenv("AWS_ATHENA_S3_STAGING_DIR"),
region_name=os.getenv("AWS_REGION"),
)

SQL_QUERY = """
SELECT
CAST(pin AS varchar) AS pin,
CAST(pin10 AS varchar) AS pin10
FROM default.vw_pin_universe
WHERE triad_name = 'City' AND year = '2023'
"""

cursor = conn.cursor()
cursor.execute(SQL_QUERY)
chicago_pin_universe = as_pandas(cursor)
chicago_pin_universe.to_csv("chicago_pin_universe.csv", index=False)

return chicago_pin_universe


def download_all_permits():
# update limit in url below when ready to work with full dataset (as of Dec 7, 2023 dataset has 757,677 rows)
url = "https://data.cityofchicago.org/resource/ydr8-5enu.json?$limit=5000&$order=issue_date DESC"
permits_response = requests.get(url)
def download_permits(start_date, end_date):
params = {
"$where": f"issue_date between '{start_date}' and '{end_date}'",
"$order": "issue_date DESC",
"$limit": 1000000 # Artificial limit to override the default
}
url = "https://data.cityofchicago.org/resource/ydr8-5enu.json"
permits_response = requests.get(url, params=params)
permits_response.raise_for_status()
permits = permits_response.json()
permits_df = pd.DataFrame(permits)
Expand Down Expand Up @@ -89,9 +93,17 @@ def expand_multi_pin_permits(df):
# update pin to match formatting of iasWorld
def format_pin(df):
# iasWorld format doesn't include dashes
df["pin_final"] = df["solo_pin"].astype(str).str.replace("-", "")
df["pin_final"] = df["solo_pin"].astype("string").str.replace("-", "")
# add zeros to 10-digit PINs to transform into 14-digits PINs
df["pin_final"] = df["pin_final"].apply(lambda x: x + "0000" if len(x) == 10 else x if x != "nan" else "")
def pad_pin(pin):
if not pd.isna(pin):
if len(pin) == 10:
return pin + "0000"
else:
return pin
else:
return ""
df["pin_final"] = df["pin_final"].apply(pad_pin)
df["NOTE: 0000 added to PIN?"] = df["pin_final"].apply(lambda x: "Yes" if len(x) == 10 else "No")
return df

Expand All @@ -101,7 +113,7 @@ def organize_columns(df):

address_columns = ["street_number", "street_direction", "street_name", "suffix"]
df[address_columns] = df[address_columns].fillna("")
df["Address"] = df[address_columns].astype(str).agg(" ".join, axis=1)
df["Address"] = df[address_columns].astype("string").agg(" ".join, axis=1)

df["issue_date"] = pd.to_datetime(df["issue_date"], format="%Y-%m-%dT%H:%M:%S.%f", errors='coerce').dt.strftime("%-m/%-d/%Y")

Expand Down Expand Up @@ -154,12 +166,12 @@ def flag_invalid_pins(df, valid_pins):
df["FLAG, INVALID: PIN* [PARID]"] = np.where(df["PIN* [PARID]"] == "", 0, ~df["PIN* [PARID]"].isin(valid_pins["pin"]))

# also check if 10-digit PINs are valid to narrow down on problematic portion of invalid PINs
df["pin_10digit"] = df["PIN* [PARID]"].astype(str).str[:10]
df["pin_10digit"] = df["PIN* [PARID]"].astype("string").str[:10]
df["FLAG, INVALID: pin_10digit"] = np.where(df["pin_10digit"] == "", 0, ~df["pin_10digit"].isin(valid_pins["pin10"]))

# create variable that is the numbers following the 10-digit PIN
# (not pulling last 4 digits from the end in case there are PINs that are not 14-digits in Chicago permit data)
df["pin_suffix"] = df["PIN* [PARID]"].astype(str).str[10:]
df["pin_suffix"] = df["PIN* [PARID]"].astype("string").str[10:]

# comment for rows with invalid pin
df["FLAG COMMENTS"] += df["FLAG, INVALID: PIN* [PARID]"].apply(lambda val: "" if val == 0 else "PIN* [PARID] is invalid, see Original PIN for raw form; ")
Expand Down Expand Up @@ -239,6 +251,67 @@ def flag_fix_long_fields(df):
return df


def deduplicate_permits(cursor, df, start_date, end_date):
cursor.execute(
"""
SELECT
parid,
permdt,
amount,
note2,
user21,
user28,
user43
FROM iasworld.permit
WHERE permdt BETWEEN %(start_date)s AND %(end_date)s
""",
{"start_date": start_date, "end_date": end_date},
)
existing_permits = as_pandas(cursor)
workbook_to_iasworld_col_map = {
"PIN* [PARID]": "parid",
"Issue Date* [PERMDT]": "permdt",
"Amount* [AMOUNT]": "amount",
"Applicant Street Address* [ADDR1]": "note2",
"Applicant* [USER21]": "user21",
"Local Permit No.* [USER28]": "user28",
"Notes [NOTE1]": "user43",
}
new_permits = df.copy()
for workbook_key, iasworld_key in workbook_to_iasworld_col_map.items():
new_permits[iasworld_key] = new_permits[workbook_key]

# Transform new columns to ensure they match the iasworld formatting
new_permits["amount"] = new_permits["amount"].apply(
lambda x: decimal.Decimal("{:.2f}".format(x))
)
new_permits["permdt"] = new_permits["permdt"].apply(
lambda x: datetime.strptime(x, "%m/%d/%Y").strftime(
"%Y-%m-%d %H:%M:%S.%f"
)[:-5]
)
new_permits["note2"] = new_permits["note2"] + ",,CHICAGO, IL"
new_permits["user43"] = new_permits["user43"].str.replace(
"(", ""
).replace(")", "")
new_permits["user43"] = new_permits["user43"].str.slice(0, 261)

# Antijoin new_permits to existing_permits to find permits that do
# not exist in iasworld
merged_permits = pd.merge(
new_permits,
existing_permits,
how="left",
on=list(workbook_to_iasworld_col_map.values()),
indicator=True
)
true_new_permits = merged_permits[merged_permits["_merge"] == "left_only"]
true_new_permits = true_new_permits.drop("_merge", axis=1)
for iasworld_key in workbook_to_iasworld_col_map.values():
true_new_permits = true_new_permits.drop(iasworld_key, axis=1)

return true_new_permits


def gen_file_base_name():
today = datetime.today().date()
Expand Down Expand Up @@ -301,28 +374,66 @@ def save_xlsx_files(df, max_rows, file_base_name):
df_review_empty_invalid.to_excel(file_name_review_empty_invalid, index=False, engine="xlsxwriter")


if __name__ == "__main__":
conn = connect(
s3_staging_dir=os.getenv(
"AWS_ATHENA_S3_STAGING_DIR",
"s3://ccao-athena-results-us-east-1",
),
region_name=os.getenv(
"AWS_REGION",
"us-east-1",
),
)
cursor = conn.cursor()

if os.path.exists("chicago_pin_universe.csv"):
print("Loading Chicago PIN universe data from csv.")
chicago_pin_universe = pd.read_csv(
"chicago_pin_universe.csv",
dtype={"pin": "string", "pin10": "string"}
)
else:
chicago_pin_universe = pull_existing_pins_from_athena(cursor)

start_date, end_date, deduplicate = sys.argv[1], sys.argv[2], sys.argv[3]
deduplicate = deduplicate.lower() == "true"

permits = download_permits(start_date, end_date)
print(
f"Downloaded {len(permits)} "
f"permit{'' if len(permits) == 1 else 's'} "
f"between {start_date} and {end_date}"
)

permits_expanded = expand_multi_pin_permits(permits)

# CALL FUNCTIONS
if os.path.exists("chicago_pin_universe.csv"):
print("Loading Chicago PIN universe data from csv.")
chicago_pin_universe = pd.read_csv("chicago_pin_universe.csv")
else:
chicago_pin_universe = pull_existing_pins_from_athena()

permits = download_all_permits()

permits_expanded = expand_multi_pin_permits(permits)
permits_pin = format_pin(permits_expanded)

permits_pin = format_pin(permits_expanded)
permits_renamed = organize_columns(permits_expanded)

permits_renamed = organize_columns(permits_expanded)
permits_validated = flag_invalid_pins(permits_renamed, chicago_pin_universe)

permits_validated = flag_invalid_pins(permits_renamed, chicago_pin_universe)
permits_shortened = flag_fix_long_fields(permits_validated)

permits_shortened = flag_fix_long_fields(permits_validated)
if deduplicate:
print(
"Number of permits prior to deduplication: "
f"{len(permits_shortened)}"
)
permits_deduped = deduplicate_permits(
cursor,
permits_shortened,
start_date,
end_date
)
print(
"Number of permits after deduplication: "
f"{len(permits_deduped)}"
)
else:
permits_deduped = permits_shortened

file_base_name = gen_file_base_name()
file_base_name = gen_file_base_name()

save_xlsx_files(permits_shortened, 200, file_base_name)
save_xlsx_files(permits_deduped, 200, file_base_name)

0 comments on commit 1551920

Please sign in to comment.