-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Weather Data Plots #168
base: master
Are you sure you want to change the base?
Weather Data Plots #168
Changes from all commits
1134def
fd39e45
785b046
fc358b4
89ea987
91f1389
2987373
ed4b19c
5962bf5
f8ee0e1
6593686
5913826
8df1b63
44d0de3
3927b28
7619b82
4e07afe
e84d1b0
99cd54f
fbefd2d
ee2d54a
4e5661b
ac882b0
15305a9
f93da2f
741ae08
0037ebc
d7c0360
d9e0f75
5e4b991
dbb6f2b
2739a31
3448675
09c5d50
d4a8280
87d67e0
d9c3764
3eb2a2b
2ba4e56
2075730
cf46a04
90b3274
df613c3
0914b9f
3c3e2a3
4cd73f1
73f21ec
df60024
8931f31
de92759
d627953
3c80ae3
650709b
22ebba7
0ee5f84
90a6e38
30049e9
5b9b5e7
db6f0af
7452675
fd01848
4caf68a
9577f21
5f6c636
24361cb
a1cdc80
85c0546
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,5 @@ localdata/ | |
*.iml | ||
datascratch/ | ||
.gcpprj | ||
|
||
scripts/__pycache__ |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,4 +2,5 @@ runtime: python37 | |
entrypoint: gunicorn -b :$PORT main:app | ||
|
||
env_variables: | ||
FOURSQUARE_DATA_VERSION: "20200503-v0" | ||
FOURSQUARE_DATA_VERSION: "20200504-v0" | ||
BUCKET_NAME: "vd-weather-data" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's add a |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
from datetime import timedelta, datetime | ||
from airflow import DAG | ||
from airflow.operators.python_operator import PythonOperator | ||
from airflow.utils.dates import days_ago | ||
import json | ||
from google.cloud.storage import Client | ||
import requests | ||
from deepmerge import always_merger | ||
import os | ||
|
||
BASE_URL = "http://api.weatherapi.com/v1/history.json?key={}&q={}+united+states&dt={}" | ||
# Get a weatherapi.com api key | ||
API_KEY = os.environ.get("API_WEATHER_KEY", "a70a4e2736644cdcb9d85348202404") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keys must never be committed to version control. This must be removed and the history squashed before merging. We can register the API key as an airflow secret. |
||
BUCKET_NAME = os.environ.get("BUCKET_NAME", "default") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should fail if env not set, to force correct configuration, instead of silently choosing an invalid bucket name. |
||
|
||
default_args = { | ||
'owner': 'airflow', | ||
'depends_on_past': False, | ||
'start_date': days_ago(1), | ||
'retries': 1, | ||
'retry_delay': timedelta(minutes=5) | ||
} | ||
|
||
gcs = Client() | ||
bucket = gcs.bucket(BUCKET_NAME) | ||
state_file = bucket.get_blob("states_counties.json") | ||
STATES = json.loads(state_file.download_as_string()) | ||
|
||
def slugify_state(state): | ||
return "-".join(state.split()) | ||
|
||
def get_weather_data(query): | ||
weather = {} | ||
weather["forecast"] = {} | ||
date = datetime.today() | ||
full_url = BASE_URL.format(API_KEY, query, date.strftime('%Y-%m-%d')) | ||
response = requests.get(full_url) | ||
data = response.json() | ||
try: | ||
forecast = data["forecast"]["forecastday"][0] | ||
location = data["location"] | ||
forecast["day"].pop("condition") | ||
weather = {**weather, **location} | ||
|
||
weather["forecast"][forecast["date_epoch"]] = forecast["day"] | ||
except: | ||
return weather | ||
|
||
return weather | ||
|
||
def weather_func_builder(state): | ||
selected_state = state | ||
def get_weather(): | ||
data = {"updated": datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | ||
counties = STATES[selected_state] | ||
blob = bucket.get_blob("{}.json".format(selected_state)) | ||
if blob is None: | ||
stated_cached_data = {} | ||
else: | ||
stated_cached_data = json.loads(blob.download_as_string()) | ||
for county in counties: | ||
api_data = get_weather_data(county) | ||
cached_data = stated_cached_data.get(county, {}) | ||
data[county] = always_merger.merge(cached_data, api_data) | ||
|
||
state_blob = bucket.blob("{}.json".format(selected_state)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Proposed: Let's use |
||
state_blob.upload_from_string(json.dumps(data)) | ||
|
||
return True | ||
return get_weather | ||
|
||
|
||
|
||
def create_dag(dag_id, state): | ||
dag = DAG( | ||
dag_id=dag_id, | ||
description="Weather DAG", | ||
default_args=default_args, | ||
schedule_interval='@daily' | ||
) | ||
|
||
|
||
get_data_api = PythonOperator( | ||
task_id="get-data-{}".format(slugify_state(state)), | ||
python_callable=weather_func_builder(state), | ||
dag=dag | ||
) | ||
|
||
return dag | ||
|
||
for state in STATES.keys(): | ||
dag_id = "{}-weather".format(slugify_state(state)) | ||
globals()[dag_id] = create_dag(dag_id, state) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,4 @@ | ||
apache-airflow==1.10.10 | ||
google-cloud-storage==1.27.0 | ||
requests | ||
deepmerge | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pin version in |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,11 +9,13 @@ | |
import yaml | ||
from flask import Flask, redirect, render_template, request | ||
from google.cloud import storage | ||
from scripts.weather import get_state_weather_locally, get_state_weather_cloud | ||
|
||
|
||
app = Flask(__name__, static_url_path="", static_folder="static") | ||
app_state = { | ||
"maps_api_key": "", | ||
"weather_path_data": "vd-weather-data", | ||
"foursquare_data_url": "", | ||
"foursquare_data_version": "" | ||
} | ||
|
@@ -143,6 +145,13 @@ def data(path): | |
snapshot_id=app_state['foursquare_data_version']) | ||
|
||
|
||
@app.route("/weather/<state>") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we can just get this from |
||
def weather(state): | ||
if app_state["weather_path_data"] != "": | ||
return get_state_weather_cloud(state, app_state["weather_path_data"]) | ||
else: | ||
return get_state_weather_locally(state) | ||
|
||
def page_not_found(e): | ||
return render_template('404.html'), 404 | ||
|
||
|
@@ -183,11 +192,21 @@ def _init_data_env(): | |
app_state["foursquare_data_url"] =\ | ||
f"//data.visitdata.org/processed/vendor/foursquare/asof/{foursquare_data_version}" | ||
|
||
def _init_weather_data_env(): | ||
# Gcloud bucket name | ||
bucket_name = os.getenv("BUCKET_NAME", "vd-weather-data") | ||
|
||
if bucket_name == "": | ||
error("Weather data will be stored locally") | ||
|
||
app_state["weather_path_data"] = bucket_name | ||
|
||
|
||
def _init(): | ||
app.config["SEND_FILE_MAX_AGE_DEFAULT"] = 60 | ||
app.register_error_handler(404, page_not_found) | ||
_init_maps_api_key() | ||
_init_weather_data_env() | ||
_init_data_env() | ||
print(app_state) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,6 @@ | ||
Flask==1.1.1 | ||
gunicorn==19.10.0 | ||
gunicorn==20.0. | ||
google-cloud-storage==1.27.0 | ||
pyyaml==5.3.1 | ||
requests | ||
deepmerge | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Pin version in requirements.txt so we know we're all running the same thing. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
BASE_URL = "http://api.weatherapi.com/v1/history.json?key={}&q={}+united+states&dt={}" | ||
API_KEY = "a70a4e2736644cdcb9d85348202404" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. API keys must never be committed to version control. This must be removed and squashed before merging. |
||
DATA_PATH = "localdata/" | ||
|
||
NO_STATE_ERROR_RESPONSE = { | ||
"error": "There is no data for that state" | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
import requests, json | ||
from datetime import datetime, timedelta | ||
from deepmerge import always_merger | ||
from scripts.config import * | ||
from google.cloud import storage | ||
|
||
|
||
def get_weather_data(query, limit=30): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. At runtime we shouldn't reach out to the weather API (and especially not make several calls - we will probably have our API key revoked if our traffic spikes). The ETL job already downloads this offline and puts it in a bucket, so we should access from there (and then probably cache in memory similar to |
||
weather = {} | ||
weather["forecast"] = {} | ||
for day in range(limit, -1, -1): | ||
date = datetime.today() - timedelta(days=day) | ||
full_url = BASE_URL.format(API_KEY, query, date.strftime('%Y-%m-%d')) | ||
response = requests.get(full_url) | ||
data = response.json() | ||
try: | ||
forecast = data["forecast"]["forecastday"][0] | ||
location = data["location"] | ||
forecast["day"].pop("condition") | ||
weather = {**weather, **location} | ||
|
||
weather["forecast"][forecast["date_epoch"]] = forecast["day"] | ||
except : | ||
continue | ||
|
||
return weather | ||
|
||
def __load_state_file(state): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dunders are typically reserved for the Python Core API team. A single |
||
try: | ||
with open("{}{}.json".format(DATA_PATH, state)) as f: | ||
data = json.load(f); | ||
return data | ||
except FileNotFoundError: | ||
return {} | ||
|
||
def __update_state_file(state, data): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change dunder to single underscore. |
||
with open("{}{}.json".format(DATA_PATH, state), "w+") as f: | ||
json.dump(data, f) | ||
|
||
def get_state_weather_locally(state): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was good for debug mode, but once the ETL works, we can probably remove the local access in favor of always reading from the bucket, similar to the visit data. |
||
STATES = {} | ||
|
||
with open("states_counties.json") as f: | ||
STATES = json.load(f) | ||
|
||
if state not in STATES: | ||
return json.dumps(NO_STATE_ERROR_RESPONSE) | ||
cached_data = __load_state_file(state) | ||
for county in STATES[state]: | ||
weather_data = get_weather_data(county) | ||
county_data = cached_data.get(county, {}) | ||
cached_data[county] = always_merger.merge(county_data, weather_data) | ||
__update_state_file(state, cached_data) | ||
return json.dumps(cached_data) | ||
|
||
def get_state_weather_cloud(state, bucket_name): | ||
storage_client = storage.Client() | ||
bucket = storage_client.bucket(bucket_name) | ||
file = bucket.get_blob("{}.json".format(state)) | ||
return json.loads(file.download_as_string()) if file is not None else {} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to consolidate all of this under
https://visitdata.org/data/
and present it as a single bundle that all goes together.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markroth8 the only issue is that it would increase the size of the initial data load for each page before it renders. If we assume that in most cases, people will be looking at the visits and not the weather, it would be better to load it only upon request.
In general if we plan to add more data sources which might be only displayed based on user's choices in the UI, the architecture of separating those data sources might be better?