-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_db_script.py
191 lines (164 loc) · 6.77 KB
/
main_db_script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
import requests
import bs4
import sqlite3
import pandas as pd
hr_db_filename = 'C:/Users/Jeff/Google Drive/research/Hampton Roads Data/Time Series/' \
'hampt_rd_data.sqlite'
def get_id(typ, data):
"""
gets either the siteid or variableid from the db
:param typ: String. Either "Site" or "Variable"
:param data: Dict. the site or variable data
:return: int. id of site or variable
"""
data_df = pd.DataFrame(data, index=[0])
code_name = '{}Code'.format(typ)
table_name = '{}s'.format(typ.lower())
id_name = '{}ID'.format(typ)
code = data[code_name]
check_by = [code_name]
append_non_duplicates(table_name, data_df, check_by)
table = get_db_table_as_df(table_name)
id_row = table[table[code_name] == code]
id_num = id_row[id_name].values[0]
return id_num
def append_non_duplicates(table, df, check_col, site_id=None, var_id=None):
"""
adds values that are not already in the db to the db
:param table: String. name of table where the values should be added e.g. 'sites'
:param df: pandas df. a dataframe with the data to be potentially added to the db
:param check_col: List. the columns that will be used to check for duplicates in db e.g.
'VariableCode' and 'VariableType' if checking a variable
:return: pandas df. a dataframe with the non duplicated values
"""
con = sqlite3.connect(hr_db_filename)
if table =='datavalues' and site_id and var_id:
sql = "SELECT * FROM datavalues WHERE SiteID = {} AND VariableID = {}".format(site_id,
var_id)
db_df = get_db_table_as_df(table, sql)
else:
db_df = get_db_table_as_df(table)
if not db_df.empty:
if table == 'datavalues':
df.reset_index(inplace=True)
db_df.reset_index(inplace=True)
merged = df.merge(db_df,
how='outer',
on=check_col,
indicator=True)
non_duplicated = merged[merged._merge == 'left_only']
filter_cols = [col for col in list(non_duplicated) if "_y" not in col and "_m" not in col]
non_duplicated = non_duplicated[filter_cols]
cols_clean = [col.replace('_x', '') for col in list(non_duplicated)]
non_duplicated.columns = cols_clean
non_duplicated = non_duplicated[df.columns]
non_duplicated.to_sql(table, con, if_exists='append', index=False)
return df
else:
index = True if table == 'datavalues' else False
df.to_sql(table, con, if_exists='append', index=index)
return df
def get_db_table_as_df(name, sql="""SELECT * FROM {};""", date_col=None, dbfilename=hr_db_filename):
con = sqlite3.connect(dbfilename)
sql = sql.format(name)
if name == 'datavalues':
date_col = 'Datetime'
df = pd.read_sql(sql, con, parse_dates=date_col)
if name == 'datavalues':
df = make_date_index(df, 'Datetime')
return df
def make_date_index(df, field, fmt=None):
df.loc[:, field] = pd.to_datetime(df.loc[:, field], format=fmt)
df.set_index(field, drop=True, inplace=True)
return df
def get_table_for_variable_code(variable_code, site_id=None, start_date=None, end_date=None):
var_id = get_id('Variable', {'VariableCode': variable_code})
table_name = 'datavalues'
sql = """SELECT * FROM {} WHERE VariableID={};""".format(table_name, var_id)
if start_date or end_date:
if not start_date:
start_date = '1900-01-01'
elif not end_date:
end_date = '2100-01-01'
sql = """SELECT * FROM {} WHERE VariableID={} AND Datetime BETWEEN '{}' AND '{}';""".format(
table_name,
var_id,
start_date,
end_date
)
df = get_db_table_as_df(table_name, sql=sql)
df = df.sort_index()
if site_id:
df = df[df['SiteID'] == site_id]
return df
def get_code_from_id(typ, id):
"""
:param typ: string 'Variable' or 'Site'
:param id: int
:return:
"""
table_name = '{}s'.format(typ.lower())
table = get_db_table_as_df(table_name)
code_name = '{}Code'.format(typ)
id_name = '{}ID'.format(typ)
return table[table[id_name] == id][code_name].values[0]
def parse_wml2_data(wml2url, src_org):
"""
parses wml2 data into pandas dataframe and adds the data, including the site and variable, into
the database if not already in there
:param wml2url: String. the service response in wml2 format
:param src_org: String. the organization e.g. "USGS"
:return: dataframe of the time series
"""
con = sqlite3.connect(hr_db_filename)
soup = get_server_data(wml2url)
res_list = []
site_data = get_site_data(soup, src_org)
site_id = get_id('Site', site_data)
variable_block = soup.find_all('wml2:observationmember')
for v in variable_block:
value_tags_list = v.find_all('wml2:point')
variable_data = get_variable_data(v)
variable_id = get_id('Variable', variable_data)
for value_tag in value_tags_list:
datetime = value_tag.find('wml2:time').text
val = value_tag.find('wml2:value').text
res = {'VariableID': variable_id,
'SiteID': site_id,
'Value': val,
'Datetime': datetime,
}
res_list.append(res)
df = pd.DataFrame(res_list)
df['Value'] = pd.to_numeric(df['Value'])
df = make_date_index(df, 'Datetime')
append_non_duplicates('datavalues', df, ['SiteID', 'Datetime', 'VariableID'])
return df
def get_site_data(soup, src_org):
site_code = soup.find('gml:identifier').text
site_name = soup.find('om:featureofinterest')['xlink:title']
site_lat = soup.find('gml:pos').text.split(' ')[0]
site_lon = soup.find('gml:pos').text.split(' ')[1]
return {'SiteCode': site_code,
'SiteName': site_name,
'SourceOrg': src_org,
'Lat': site_lat,
'Lon': site_lon
}
def get_variable_data(soup):
variable_code = soup.find("om:observedproperty")["xlink:href"].split("=")[1]
variable_name = soup.find("om:observedproperty")["xlink:title"]
variable_type = soup.find("om:name")["xlink:title"]
uom = soup.find("wml2:uom")["xlink:title"]
return {'VariableCode': variable_code,
'VariableName': variable_name,
'VariableType': variable_type,
'Units': uom
}
def get_server_data(url):
response = requests.get(url)
soup = bs4.BeautifulSoup(response.text, 'lxml')
return soup
def get_df_for_dates(var_code, start_date, end_date):
df = get_table_for_variable_code(var_code, start_date=start_date, end_date=end_date)
return df.pivot(columns='SiteID', values='Value')