-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild_crs_database.py
275 lines (216 loc) · 11.7 KB
/
build_crs_database.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
"""
For building PostgreSQL database tables to house CRS data for the web app.
"""
import psycopg2
import os
import pandas as pd
import StringIO
# somewhat different than what is in models.py
# 'agency' is omitted here as it's more complicated
CODE_TABLES = ['donor', 'recipient', 'region', 'incomegroup', 'flow', 'purpose', 'sector', 'channel']
# list of (column name, postgres type) tuples for the columns in the CRS data
# note that postgres will remove any capitalization, e.g. Year -> year
CRS_COLUMN_SPEC = [
('Year', 'integer'),
('donorcode', 'integer REFERENCES donor'),
('agencycode', 'integer'),
('crsid', 'varchar (63)'),
('projectnumber', 'varchar (63)'),
('initialreport', 'integer'),
('recipientcode', 'integer REFERENCES recipient'),
('regioncode', 'integer REFERENCES region'),
('incomegroupcode', 'integer REFERENCES incomegroup'),
('flowcode', 'integer REFERENCES flow'),
('bi_multi', 'integer'),
('category', 'integer'),
('finance_t', 'integer'),
('aid_t', 'varchar (63)'),
('usd_commitment', 'double precision'),
('usd_disbursement', 'double precision'),
('usd_received', 'double precision'),
('usd_commitment_defl', 'double precision'),
('usd_disbursement_defl', 'double precision'),
('usd_received_defl', 'double precision'),
('usd_adjustment', 'double precision'),
('usd_adjustment_defl', 'double precision'),
('usd_amountuntied', 'double precision'),
('usd_amountpartialtied', 'double precision'),
('usd_amounttied', 'double precision'),
('usd_amountuntied_defl', 'double precision'),
('usd_amountpartialtied_defl', 'double precision'),
('usd_amounttied_defl', 'double precision'),
('usd_IRTC', 'double precision'),
('usd_expert_commitment', 'double precision'),
('usd_expert_extended', 'double precision'),
('usd_export_credit', 'double precision'),
('currencycode', 'integer'),
('commitment_national', 'double precision'),
('disbursement_national', 'double precision'),
('shortdescription', 'text'),
('projecttitle', 'text'),
('purposecode', 'integer REFERENCES purpose'),
('sectorcode', 'integer REFERENCES sector'),
# should be 'integer REFERENCES channel', but pandas data has channelcode as float to handle nulls
('channelcode', 'double precision'),
('channelreportedname', 'text'),
('geography', 'text'),
('expectedstartdate', 'varchar (63)'),
('completiondate', 'varchar (63)'),
('longdescription', 'text'),
('gender', 'double precision'),
('trade', 'double precision'),
('FTC', 'double precision'),
('PBA', 'double precision')
]
# custom columns we want to add to the data
CATEGORY_COLUMN_NAME = 'tj_category_id'
INCLUSION_COLUMN_NAME = 'tj_inclusion_id'
def get_db_connection(host, database, user, password):
return psycopg2.connect(host=host, database=database, user=user, password=password)
def get_all_name_code_pairs(dataframe, filter_type):
"""
Returns a dataframe containing ___code/___name pairs.
"""
code_column = filter_type + 'code'
name_column = filter_type + 'name'
rows = dataframe[[code_column, name_column]].drop_duplicates()
# filter out missing codes (can happen for channel), and missing values will mean that pandas will have
# interpreted code column as float (so that it can use NaN), need to set as int
rows = rows.dropna()
rows[code_column] = rows[code_column].astype(int)
rows = rows.sort(name_column)
if filter_type == 'purpose':
# hack around an unfortunate issue in CRS data
# There are two names assigned to purposecode 11420, so we have to remove one of them
# It shouldn't affect this analysis since these aren't TJ-related
rows = rows[rows.purposename != 'Imputed student costs']
return rows.rename(columns={code_column: 'code', name_column: 'name'})
def build_code_tables(cursor, dataframe):
create_template = 'CREATE TABLE {table_name} ({code_column} integer primary key, {name_column} varchar(127));'
insert_template = "INSERT INTO {table_name} VALUES (%(code)s, %(name)s);"
index_template = 'CREATE UNIQUE INDEX ON {table_name} ({code_column});'
for filter_type in CODE_TABLES:
table_name = filter_type
rows = get_all_name_code_pairs(dataframe, filter_type)
# these are the database table names, not the pandas dataframe column names
code_column = filter_type + 'code'
name_column = filter_type + 'name'
name_map = {'table_name': table_name, 'code_column': code_column, 'name_column': name_column}
# create table, populate it, and add an index
create_sql = create_template.format(**name_map)
cursor.execute(create_sql)
insert_sql = insert_template.format(**name_map)
for i, row in rows.iterrows():
cursor.execute(insert_sql, {'code': row['code'], 'name': row['name']})
index_sql = index_template.format(**name_map)
cursor.execute(index_sql)
def build_agency_table(cursor, dataframe):
"""
Agencies are special in that they have the compound key (donorcode, agencycode)
"""
rows = dataframe[['donorcode', 'agencycode', 'agencyname']].drop_duplicates()
create_sql = 'CREATE TABLE agency (donorcode integer, agencycode integer, agencyname varchar(127), ' \
'PRIMARY KEY(donorcode, agencycode));'
cursor.execute(create_sql)
insert_sql = 'INSERT INTO agency VALUES (%(donorcode)s, %(agencycode)s, %(agencyname)s);'
for i, row in rows.iterrows():
cursor.execute(insert_sql, {'donorcode': row['donorcode'], 'agencycode': row['agencycode'],
'agencyname': row['agencyname']})
index_sql = 'CREATE UNIQUE INDEX ON agency(donorcode, agencycode);'
cursor.execute(index_sql)
def build_custom_data_tables(cursor):
# yes, these are just code-tables that we don't expect to change...
# they are hard-coded here, could be hardcoded in the web app,
# but this makes it nicer if people ever want to query the db directly
# or build another app on top of this data
inclusion_create_sql = "CREATE TABLE tj_inclusion (" + INCLUSION_COLUMN_NAME +\
" smallint PRIMARY KEY, tj_inclusion_name varchar(31));"
cursor.execute(inclusion_create_sql)
inclusion_insert_sql = "INSERT INTO tj_inclusion VALUES (%(tj_inclusion_id)s, %(tj_inclusion_name)s);"
cursor.execute(inclusion_insert_sql, {'tj_inclusion_id': 0, 'tj_inclusion_name': 'Exclude'})
cursor.execute(inclusion_insert_sql, {'tj_inclusion_id': 1, 'tj_inclusion_name': 'Include'})
cursor.execute(inclusion_insert_sql, {'tj_inclusion_id': 2, 'tj_inclusion_name': 'Maybe include'})
# and if we ever come up with other tiers of inclusion, we can add them here
inclusion_index_sql = "CREATE UNIQUE INDEX on tj_inclusion (" + INCLUSION_COLUMN_NAME + ");"
cursor.execute(inclusion_index_sql)
category_create_sql = "CREATE TABLE tj_category (" + CATEGORY_COLUMN_NAME +\
" smallint PRIMARY KEY, tj_category_name varchar(31));"
cursor.execute(category_create_sql)
category_insert_sql = "INSERT INTO tj_category VALUES (%(tj_category_id)s, %(tj_category_name)s);"
cursor.execute(category_insert_sql, {'tj_category_id': 1, 'tj_category_name': 'Truth and memory'})
cursor.execute(category_insert_sql, {'tj_category_id': 2, 'tj_category_name': 'Criminal justice'})
cursor.execute(category_insert_sql, {'tj_category_id': 3, 'tj_category_name': 'Reparations'})
cursor.execute(category_insert_sql, {'tj_category_id': 4, 'tj_category_name': 'Institutional reform'})
cursor.execute(category_insert_sql, {'tj_category_id': 5, 'tj_category_name': 'Reconciliation'})
cursor.execute(category_insert_sql, {'tj_category_id': 6, 'tj_category_name': 'General TJ'})
# if we ever come up with other categories, we can add them here
# add this as a 'null' category
cursor.execute(category_insert_sql, {'tj_category_id': 0, 'tj_category_name': 'None'})
category_index_sql = "CREATE UNIQUE INDEX on tj_category (" + CATEGORY_COLUMN_NAME + ");"
cursor.execute(category_index_sql)
def create_crs_table(cursor):
sql = 'CREATE TABLE crs ('
# auto-generated primary key
sql += 'crs_pk serial PRIMARY KEY,'
# custom columns
sql += INCLUSION_COLUMN_NAME + ' smallint REFERENCES tj_inclusion,'
sql += CATEGORY_COLUMN_NAME + ' smallint REFERENCES tj_category DEFAULT 0 NOT NULL,'
# desired columns from CRS file
column_spec_list = [column_name + ' ' + column_type for column_name, column_type in CRS_COLUMN_SPEC]
sql += ','.join(column_spec_list)
sql += ');'
cursor.execute(sql)
def populate_crs_table(cursor, dataframe):
# write the dataframe to a buffer
byte_buffer = StringIO.StringIO()
columns_of_interest = [column_name for column_name, column_type in CRS_COLUMN_SPEC]
dataframe[columns_of_interest].to_csv(byte_buffer, header=False, index=False)
byte_buffer.seek(0) # rewind to beginning of the buffer
# bulk-load the table from the buffer
copy_sql = "COPY crs({columns}) FROM STDIN WITH CSV".format(columns=",".join(columns_of_interest))
cursor.copy_expert(copy_sql, byte_buffer)
def index_crs_table(cursor):
# index code table columns in main crs table as we'll use them to filter queries
code_index_template = 'CREATE INDEX {index_name} ON crs ({code_column});'
for filter_type in CODE_TABLES:
name_map = {'index_name': 'crs_' + filter_type + 'code_idx', 'code_column': filter_type + 'code'}
index_sql = code_index_template.format(**name_map)
cursor.execute(index_sql)
# also index the custom columns as they will be important for filtering
# note these indices are on the only non-static data (maybe need to be rebuilt if ever used heavily?)
custom_index_sql_template = 'CREATE INDEX {index_name} ON crs ({custom_column});'
for custom_column in (INCLUSION_COLUMN_NAME, CATEGORY_COLUMN_NAME):
name_map = {'index_name': 'crs_' + custom_column + '_idx', 'custom_column': custom_column}
custom_index_sql = custom_index_sql_template.format(**name_map)
cursor.execute(custom_index_sql)
# Add a special column to crs just for text search, then index it.
alter_sql = 'ALTER TABLE crs ADD COLUMN searchable_text tsvector;'
cursor.execute(alter_sql)
# Unfortunately we have to pick a language here, and our text columns are in several different languages.
# If we could figure out the language of each row (by donor, recipient, or text analysis?) and store it in
# a column, then we could use to_tsvector(language_column, text_column).
update_sql = "UPDATE crs SET searchable_text = to_tsvector('english'," \
"coalesce(projecttitle,'') || ' ' || " \
"coalesce(shortdescription,'') || ' ' || " \
"coalesce(longdescription,''));"
cursor.execute(update_sql)
text_index_sql = "CREATE INDEX crs_textsearch_idx ON crs USING gin(searchable_text);"
cursor.execute(text_index_sql)
if __name__ == "__main__":
host = os.environ['POSTGRES_HOST']
database = os.environ['POSTGRES_DB']
user = os.environ['POSTGRES_USER']
password = os.environ['POSTGRES_PASSWORD']
connection = get_db_connection(host, database, user, password)
cursor = connection.cursor()
# dataframe = pd.read_pickle('/home/andrew/oecd/crs/processed/2014-01-30/all_data.pkl')
dataframe = pd.read_pickle('/home/andrew/oecd/crs/processed/2014-01-30/filtered.pkl')
build_code_tables(cursor, dataframe)
build_agency_table(cursor, dataframe)
build_custom_data_tables(cursor)
create_crs_table(cursor)
populate_crs_table(cursor, dataframe)
index_crs_table(cursor)
connection.commit()
cursor.close()
connection.close()