Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scripts/export.py script for exporting flags to iasWorld #114

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ venv/
__pycache__/
ipynb_checkpoints
glue/schema/*.parquet
*.csv

# Terraform internals
.terraform
Expand Down
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Table of Contents
- [Flagging Details](#flagging-details)
- [Structure of Data](#structure-of-data)
- [AWS Glue Job Documentation](#aws-glue-job-documentation)
- [Exporting Flags to iasWorld](#exporting-flags-to-iasworld)

## Model Overview

Expand Down Expand Up @@ -335,3 +336,15 @@ The Glue job and its flagging script are written in Python, while the job detail
- If you need to make further changes, push commits to your branch and GitHub Actions will deploy the changes to the staging job and its associated resources.
- Once you're happy with your changes, request review on your PR.
- Once your PR is approved, merge it into `main`. A GitHub Actions workflow called `cleanup-terraform` will delete the staging resources that were created for your branch, while a separate `deploy-terraform` run will deploy your changes to the production job and its associated resources.

## Exporting Flags to iasWorld

Use the `scripts/export.py` script to generate a CSV that can be uploaded to iasWorld to save new flags.

Example use:

```
python3 scripts/export.py > sales_val_flags.csv
```

The `sales_val_flags.csv` file can then be sent over for upload to iasWorld.
139 changes: 139 additions & 0 deletions scripts/export.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
#!/usr/bin/env python3
# Export sales val flags to a CSV for upload to iasworld. Outputs the CSV
# to stdout.
#
# Example usage:
#
# python3 scripts/export.py > export.csv
#
import logging
import os
import sys

import pandas
import pyathena
import pyathena.pandas.util

PIN_FIELD = "PARID"
SALE_KEY_FIELD = "SALEKEY"
IS_OUTLIER_FIELD = "USER26"
OUTLIER_TYPE_FIELD = "USER27"
RUN_ID_FIELD = "USER28"
ANALYST_DETERMINATION_FIELD = "USER29"
ANALYST_REVIEW_DATE_FIELD = "UDATE1"

OUTLIER_TYPE_CODES = {
"Anomaly (high)": "1",
"Anomaly (low)": "2",
"Family sale (high)": "3",
"Family sale (low)": "4",
"High price (raw & sqft)": "5",
"High price (raw)": "6",
"High price (sqft)": "7",
"High price swing": "8",
"Home flip sale (high)": "9",
"Home flip sale (low)": "10",
"Low price (raw & sqft)": "11",
"Low price (raw)": "12",
"Low price (sqft)": "13",
"Low price swing": "14",
"Non-person sale (high)": "15",
"Non-person sale (low)": "16",
"PTAX-203 flag (High)": "17",
"PTAX-203 flag (Low)": "17",
"Not outlier": pandas.NA,
}

if __name__ == "__main__":
# Setup a logger that logs to stderr so it does not get captured as part
# of the script's data output
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stderr))
jeancochrane marked this conversation as resolved.
Show resolved Hide resolved

logger.info("Connecting to Athena")

conn = pyathena.connect(
s3_staging_dir=os.getenv(
"AWS_ATHENA_S3_STAGING_DIR", "s3://ccao-athena-results-us-east-1/"
),
region_name=os.getenv("AWS_REGION", "us-east-1"),
)

FLAG_LATEST_VERSION_QUERY = """
SELECT meta_sale_document_num, MAX(version) AS version
FROM sale.flag
GROUP BY meta_sale_document_num
"""

logger.info("Querying the count of active flags")

cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM ({FLAG_LATEST_VERSION_QUERY}) flags")
expected_num_flags = cursor.fetchall()[0][0]

logger.info(f"Got {expected_num_flags} active flags")

FLAG_QUERY = f"""
SELECT
sale.pin AS {PIN_FIELD},
sale.sale_key AS {SALE_KEY_FIELD},
CASE
WHEN flag.sv_is_outlier = TRUE
THEN 'Y'
ELSE 'N'
END AS {IS_OUTLIER_FIELD},
flag.sv_outlier_type AS {OUTLIER_TYPE_FIELD},
flag.run_id AS {RUN_ID_FIELD},
'N' AS {ANALYST_DETERMINATION_FIELD},
NULL AS {ANALYST_REVIEW_DATE_FIELD}
FROM sale.flag AS flag
-- Filter flags for the most recent version
INNER JOIN ({FLAG_LATEST_VERSION_QUERY}) AS flag_latest_version
ON flag.meta_sale_document_num = flag_latest_version.meta_sale_document_num
AND flag.version = flag_latest_version.version
INNER JOIN default.vw_pin_sale AS sale
ON flag.meta_sale_document_num = sale.doc_no
"""

logger.info("Querying sales with flags")

cursor.execute(FLAG_QUERY)
flag_df = pyathena.pandas.util.as_pandas(cursor)

num_flags = len(flag_df.index)
logger.info(f"Got {num_flags} sales with flags")

logger.info("Transforming columns")

# Transform outlier type column from string to code
flag_df[OUTLIER_TYPE_FIELD] = flag_df[OUTLIER_TYPE_FIELD].replace(
OUTLIER_TYPE_CODES
)

logger.info("Running data integrity checks")

# Run some data integrity checks
not_null_fields = [PIN_FIELD, SALE_KEY_FIELD, RUN_ID_FIELD]
for field in not_null_fields:
assert flag_df[flag_df[field].isnull()].empty, f"{field} contains nulls"

assert flag_df[
~flag_df[OUTLIER_TYPE_FIELD].isin(OUTLIER_TYPE_CODES.values())
].empty, f"{OUTLIER_TYPE_FIELD} contains invalid codes"

assert flag_df[
(flag_df[IS_OUTLIER_FIELD] == "Y") & (flag_df[OUTLIER_TYPE_FIELD].isna())
].empty, f"{OUTLIER_TYPE_FIELD} cannot be null when {IS_OUTLIER_FIELD} is Y"

assert flag_df[
(flag_df[IS_OUTLIER_FIELD] == "N") & (~flag_df[OUTLIER_TYPE_FIELD].isna())
].empty, f"{OUTLIER_TYPE_FIELD} must be null when {IS_OUTLIER_FIELD} is N"

assert (
num_flags == expected_num_flags
), f"Expected {expected_num_flags} flagged sales, got {num_flags}"

logger.info("Writing CSV to stdout")

flag_df.to_csv(sys.stdout, index=False, chunksize=10000)
Loading