-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update chicago extraction script and workflow to enable date filtering and deduplication #14
Changes from all commits
42ad62e
c5f382f
8617027
968e7ea
5b3168a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,55 +8,59 @@ | |
in separate Excel workbooks in separate folders. Data that need review are split into two categories and corresponding folders/files: | ||
those with quick fixes for fields over character or amount limits, and those with more complicated fixes for missing and/or invalid fields. | ||
The following environment variables need to be set before running this script: | ||
AWS_ATHENA_S3_STAGING_DIR=s3://ccao-athena-results-us-east-1 | ||
AWS_REGION=us-east-1 | ||
The following optional environment variables can be set: | ||
AWS_ATHENA_S3_STAGING_DIR: S3 path where Athena query results are stored | ||
AWS_REGION: Region that AWS operations should be run in | ||
The script also expects three positional arguments: | ||
* start_date (str, YYYY-MM-DD): The lower bound date to use for filtering permits | ||
* end_date (str, YYYY-MM-DD): The upper bound date to use for filtering | ||
* deduplicate (bool): Whether to filter out permits that already exist in iasworld | ||
The following will also need to be updated: | ||
- At the beginning of each year: update year to current year in SQL_QUERY inside pull_existing_pins_from_athena() function | ||
- Update limit as desired in the url in the download_all_permits() function (currently set at 5000 rows for testing purposes) | ||
""" | ||
|
||
import requests | ||
import pandas as pd | ||
from datetime import datetime | ||
import decimal | ||
import math | ||
import os | ||
import sys | ||
|
||
import numpy as np | ||
import math | ||
import pandas as pd | ||
from pyathena import connect | ||
from pyathena.pandas.util import as_pandas | ||
from datetime import datetime | ||
import requests | ||
|
||
|
||
# DEFINE FUNCTIONS | ||
|
||
# Connect to Athena and download existing 14-digit PINs in Chicago | ||
def pull_existing_pins_from_athena(): | ||
def pull_existing_pins_from_athena(cursor): | ||
print("Pulling PINs from Athena") | ||
conn = connect( | ||
s3_staging_dir=os.getenv("AWS_ATHENA_S3_STAGING_DIR"), | ||
region_name=os.getenv("AWS_REGION"), | ||
) | ||
|
||
SQL_QUERY = """ | ||
SELECT | ||
CAST(pin AS varchar) AS pin, | ||
CAST(pin10 AS varchar) AS pin10 | ||
FROM default.vw_pin_universe | ||
WHERE triad_name = 'City' AND year = '2023' | ||
""" | ||
|
||
cursor = conn.cursor() | ||
cursor.execute(SQL_QUERY) | ||
chicago_pin_universe = as_pandas(cursor) | ||
chicago_pin_universe.to_csv("chicago_pin_universe.csv", index=False) | ||
|
||
return chicago_pin_universe | ||
|
||
|
||
def download_all_permits(): | ||
# update limit in url below when ready to work with full dataset (as of Dec 7, 2023 dataset has 757,677 rows) | ||
url = "https://data.cityofchicago.org/resource/ydr8-5enu.json?$limit=5000&$order=issue_date DESC" | ||
permits_response = requests.get(url) | ||
def download_permits(start_date, end_date): | ||
params = { | ||
"$where": f"issue_date between '{start_date}' and '{end_date}'", | ||
"$order": "issue_date DESC", | ||
"$limit": 1000000 # Artificial limit to override the default | ||
} | ||
url = "https://data.cityofchicago.org/resource/ydr8-5enu.json" | ||
permits_response = requests.get(url, params=params) | ||
permits_response.raise_for_status() | ||
permits = permits_response.json() | ||
permits_df = pd.DataFrame(permits) | ||
|
@@ -89,9 +93,17 @@ def expand_multi_pin_permits(df): | |
# update pin to match formatting of iasWorld | ||
def format_pin(df): | ||
# iasWorld format doesn't include dashes | ||
df["pin_final"] = df["solo_pin"].astype(str).str.replace("-", "") | ||
df["pin_final"] = df["solo_pin"].astype("string").str.replace("-", "") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The alias for the pandas string type is actually |
||
# add zeros to 10-digit PINs to transform into 14-digits PINs | ||
df["pin_final"] = df["pin_final"].apply(lambda x: x + "0000" if len(x) == 10 else x if x != "nan" else "") | ||
def pad_pin(pin): | ||
if not pd.isna(pin): | ||
if len(pin) == 10: | ||
return pin + "0000" | ||
else: | ||
return pin | ||
else: | ||
return "" | ||
Comment on lines
+98
to
+105
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The nested ternary syntax always confuses me, so I factored it out into a full function to aid in debugging. I think this is clearer to read anyway, so I left it in. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't have told me this. I'm writing my next PR with triple-nested list comprehensions. |
||
df["pin_final"] = df["pin_final"].apply(pad_pin) | ||
df["NOTE: 0000 added to PIN?"] = df["pin_final"].apply(lambda x: "Yes" if len(x) == 10 else "No") | ||
return df | ||
|
||
|
@@ -101,7 +113,7 @@ def organize_columns(df): | |
|
||
address_columns = ["street_number", "street_direction", "street_name", "suffix"] | ||
df[address_columns] = df[address_columns].fillna("") | ||
df["Address"] = df[address_columns].astype(str).agg(" ".join, axis=1) | ||
df["Address"] = df[address_columns].astype("string").agg(" ".join, axis=1) | ||
|
||
df["issue_date"] = pd.to_datetime(df["issue_date"], format="%Y-%m-%dT%H:%M:%S.%f", errors='coerce').dt.strftime("%-m/%-d/%Y") | ||
|
||
|
@@ -154,12 +166,12 @@ def flag_invalid_pins(df, valid_pins): | |
df["FLAG, INVALID: PIN* [PARID]"] = np.where(df["PIN* [PARID]"] == "", 0, ~df["PIN* [PARID]"].isin(valid_pins["pin"])) | ||
|
||
# also check if 10-digit PINs are valid to narrow down on problematic portion of invalid PINs | ||
df["pin_10digit"] = df["PIN* [PARID]"].astype(str).str[:10] | ||
df["pin_10digit"] = df["PIN* [PARID]"].astype("string").str[:10] | ||
df["FLAG, INVALID: pin_10digit"] = np.where(df["pin_10digit"] == "", 0, ~df["pin_10digit"].isin(valid_pins["pin10"])) | ||
|
||
# create variable that is the numbers following the 10-digit PIN | ||
# (not pulling last 4 digits from the end in case there are PINs that are not 14-digits in Chicago permit data) | ||
df["pin_suffix"] = df["PIN* [PARID]"].astype(str).str[10:] | ||
df["pin_suffix"] = df["PIN* [PARID]"].astype("string").str[10:] | ||
|
||
# comment for rows with invalid pin | ||
df["FLAG COMMENTS"] += df["FLAG, INVALID: PIN* [PARID]"].apply(lambda val: "" if val == 0 else "PIN* [PARID] is invalid, see Original PIN for raw form; ") | ||
|
@@ -239,6 +251,67 @@ def flag_fix_long_fields(df): | |
return df | ||
|
||
|
||
def deduplicate_permits(cursor, df, start_date, end_date): | ||
cursor.execute( | ||
""" | ||
SELECT | ||
parid, | ||
permdt, | ||
amount, | ||
note2, | ||
user21, | ||
user28, | ||
user43 | ||
FROM iasworld.permit | ||
WHERE permdt BETWEEN %(start_date)s AND %(end_date)s | ||
""", | ||
{"start_date": start_date, "end_date": end_date}, | ||
) | ||
existing_permits = as_pandas(cursor) | ||
workbook_to_iasworld_col_map = { | ||
"PIN* [PARID]": "parid", | ||
"Issue Date* [PERMDT]": "permdt", | ||
"Amount* [AMOUNT]": "amount", | ||
"Applicant Street Address* [ADDR1]": "note2", | ||
"Applicant* [USER21]": "user21", | ||
"Local Permit No.* [USER28]": "user28", | ||
"Notes [NOTE1]": "user43", | ||
} | ||
new_permits = df.copy() | ||
for workbook_key, iasworld_key in workbook_to_iasworld_col_map.items(): | ||
new_permits[iasworld_key] = new_permits[workbook_key] | ||
|
||
# Transform new columns to ensure they match the iasworld formatting | ||
new_permits["amount"] = new_permits["amount"].apply( | ||
lambda x: decimal.Decimal("{:.2f}".format(x)) | ||
) | ||
new_permits["permdt"] = new_permits["permdt"].apply( | ||
lambda x: datetime.strptime(x, "%m/%d/%Y").strftime( | ||
"%Y-%m-%d %H:%M:%S.%f" | ||
)[:-5] | ||
) | ||
new_permits["note2"] = new_permits["note2"] + ",,CHICAGO, IL" | ||
new_permits["user43"] = new_permits["user43"].str.replace( | ||
"(", "" | ||
).replace(")", "") | ||
new_permits["user43"] = new_permits["user43"].str.slice(0, 261) | ||
Comment on lines
+285
to
+297
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These transformations don't seem to be replicated in Caroline's code anywhere, but they seem necessary according to my QA. My guess is that this represents processing that is done between smartfile and ias; if that sounds right, perhaps I can check in with Will to make sure that's correct? Otherwise, we likely want to be doing this processing in the main body of the script instead of doing it as part of the deduplication step. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm guessing that's correct, but I would absolutely check with Will. Ask him more generally about the de-duping too, since it's still unclear to me what level of de-duping we want. |
||
|
||
# Antijoin new_permits to existing_permits to find permits that do | ||
# not exist in iasworld | ||
merged_permits = pd.merge( | ||
new_permits, | ||
existing_permits, | ||
how="left", | ||
on=list(workbook_to_iasworld_col_map.values()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just checking, we're joining on basically all the fields in the workbook, and all the preprocessing is necessary to make the join work, yeah? Are there any times when the ingest into SmartFile/ias removes data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yup, that's right! Ingest into SmartFile/ias does sometimes remove data, but the preceding transformation steps along with the ones introduced in this PR should cover those transformations. I'll double-check with Will today to make sure there's nothing we missed. |
||
indicator=True | ||
) | ||
true_new_permits = merged_permits[merged_permits["_merge"] == "left_only"] | ||
true_new_permits = true_new_permits.drop("_merge", axis=1) | ||
for iasworld_key in workbook_to_iasworld_col_map.values(): | ||
true_new_permits = true_new_permits.drop(iasworld_key, axis=1) | ||
|
||
return true_new_permits | ||
|
||
|
||
def gen_file_base_name(): | ||
today = datetime.today().date() | ||
|
@@ -301,28 +374,66 @@ def save_xlsx_files(df, max_rows, file_base_name): | |
df_review_empty_invalid.to_excel(file_name_review_empty_invalid, index=False, engine="xlsxwriter") | ||
|
||
|
||
if __name__ == "__main__": | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of the diff that follows represents indentation changes due to the addition of the |
||
conn = connect( | ||
s3_staging_dir=os.getenv( | ||
"AWS_ATHENA_S3_STAGING_DIR", | ||
"s3://ccao-athena-results-us-east-1", | ||
), | ||
region_name=os.getenv( | ||
"AWS_REGION", | ||
"us-east-1", | ||
), | ||
) | ||
cursor = conn.cursor() | ||
|
||
if os.path.exists("chicago_pin_universe.csv"): | ||
print("Loading Chicago PIN universe data from csv.") | ||
chicago_pin_universe = pd.read_csv( | ||
"chicago_pin_universe.csv", | ||
dtype={"pin": "string", "pin10": "string"} | ||
) | ||
Comment on lines
+392
to
+395
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously we weren't specifying dtypes for the CSV, so pandas was inferring a float type for all of the PINs. |
||
else: | ||
chicago_pin_universe = pull_existing_pins_from_athena(cursor) | ||
|
||
start_date, end_date, deduplicate = sys.argv[1], sys.argv[2], sys.argv[3] | ||
deduplicate = deduplicate.lower() == "true" | ||
Comment on lines
+399
to
+400
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This arg parsing is new. |
||
|
||
permits = download_permits(start_date, end_date) | ||
print( | ||
f"Downloaded {len(permits)} " | ||
f"permit{'' if len(permits) == 1 else 's'} " | ||
f"between {start_date} and {end_date}" | ||
) | ||
Comment on lines
+403
to
+407
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logging is new. |
||
|
||
permits_expanded = expand_multi_pin_permits(permits) | ||
|
||
# CALL FUNCTIONS | ||
if os.path.exists("chicago_pin_universe.csv"): | ||
print("Loading Chicago PIN universe data from csv.") | ||
chicago_pin_universe = pd.read_csv("chicago_pin_universe.csv") | ||
else: | ||
chicago_pin_universe = pull_existing_pins_from_athena() | ||
|
||
permits = download_all_permits() | ||
|
||
permits_expanded = expand_multi_pin_permits(permits) | ||
permits_pin = format_pin(permits_expanded) | ||
|
||
permits_pin = format_pin(permits_expanded) | ||
permits_renamed = organize_columns(permits_expanded) | ||
|
||
permits_renamed = organize_columns(permits_expanded) | ||
permits_validated = flag_invalid_pins(permits_renamed, chicago_pin_universe) | ||
|
||
permits_validated = flag_invalid_pins(permits_renamed, chicago_pin_universe) | ||
permits_shortened = flag_fix_long_fields(permits_validated) | ||
|
||
permits_shortened = flag_fix_long_fields(permits_validated) | ||
if deduplicate: | ||
print( | ||
"Number of permits prior to deduplication: " | ||
f"{len(permits_shortened)}" | ||
) | ||
permits_deduped = deduplicate_permits( | ||
cursor, | ||
permits_shortened, | ||
start_date, | ||
end_date | ||
) | ||
print( | ||
"Number of permits after deduplication: " | ||
f"{len(permits_deduped)}" | ||
) | ||
else: | ||
permits_deduped = permits_shortened | ||
Comment on lines
+419
to
+435
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This dedupe step is new. |
||
|
||
file_base_name = gen_file_base_name() | ||
file_base_name = gen_file_base_name() | ||
|
||
save_xlsx_files(permits_shortened, 200, file_base_name) | ||
save_xlsx_files(permits_deduped, 200, file_base_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be more ergonomic to accept these as named args instead of positional args, but it would be more effort and more complexity, so as long as this script is only intended to be run on GitHub Actions it makes sense to me to go with the simpler option.