forked from siscale/covid-19-elk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogstash-github-covid-19-daily-reports-template.conf
441 lines (392 loc) · 30.2 KB
/
logstash-github-covid-19-daily-reports-template.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
input {
#Use the generator input to update the dataset when the pipeline is first started
generator {
lines => [ "first-time-run" ]
count => 1
tags => "check_github"
}
#The http_poller input plugin is used to schedule checks for dataset updates
#The "schedule" setting is defined to check for updates every new hour (at minute 0)
http_poller {
urls => {
check_github => "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_daily_reports"
}
tags => "check_github"
request_timeout => 60
schedule => { "cron" => "0 * * * * UTC" }
codec => "plain"
metadata_target => "http_poller_metadata"
}
}
filter {
#The pipeline will treat two types of events:
#The first type is the initial event that triggers the downloading, parsing, and transforming of the CSVs into time series data
#The second type is the time series data itself
#This 'if' discriminates between the two types. The time series data is treated later
if "check_github" in [tags] {
ruby {
init => '
require "csv"
require "open-uri"
require "digest"
require "json"
#Path to store the MD5 hashes of the downloaded CSV files
#This is used to keep track of any changes to the Github repo and optimize the update process
@github_stored_hashes_path = "/etc/logstash/covid-19-hashes.json"
#Base URL for downloading the daily reports CSVs
@github_daily_reports_base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/"
#Base URL for downloading the daily us reports CSV
@github_daily_reports_us_base_url = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports_us/"
@states = { "AL" => "Alabama", "AK" => "Alaska", "AS" => "American Samoa", "AZ" => "Arizona", "AR" => "Arkansas", "CA" => "California", "CO" => "Colorado", "CT" => "Connecticut", "DE" => "Delaware", "DC" => "District of Columbia", "FM" => "Federated States of Micronesia", "FL" => "Florida", "GA" => "Georgia", "GU" => "Guam", "HI" => "Hawaii", "ID" => "Idaho", "IL" => "Illinois", "IN" => "Indiana", "IA" => "Iowa", "KS" => "Kansas", "KY" => "Kentucky", "LA" => "Louisiana", "ME" => "Maine", "MH" => "Marshall Islands", "MD" => "Maryland", "MA" => "Massachusetts", "MI" => "Michigan", "MN" => "Minnesota", "MS" => "Mississippi", "MO" => "Missouri", "MT" => "Montana", "NE" => "Nebraska", "NV" => "Nevada", "NH" => "New Hampshire", "NJ" => "New Jersey", "NM" => "New Mexico", "NY" => "New York", "NC" => "North Carolina", "ND" => "North Dakota", "MP" => "Northern Mariana Islands", "OH" => "Ohio", "OK" => "Oklahoma", "OR" => "Oregon", "PW" => "Palau", "PA" => "Pennsylvania", "PR" => "Puerto Rico", "RI" => "Rhode Island", "SC" => "South Carolina", "SD" => "South Dakota", "TN" => "Tennessee", "TX" => "Texas", "UT" => "Utah", "VT" => "Vermont", "VI" => "Virgin Islands", "VA" => "Virginia", "WA" => "Washington", "WV" => "West Virginia", "WI" => "Wisconsin", "WY" => "Wyoming" }
#Input: a CSV and the date of the CSV
#Output: an array of Logstash events extracted from the CSV that will be sent to Elasticsearch
def create_events(csv, us_csv, current_date)
totals_by_country = Hash.new
totals_by_country["WORLD"] = LogStash::Event.new
totals_by_country["WORLD"].set("total_confirmed", 0)
totals_by_country["WORLD"].set("total_recovered", 0)
totals_by_country["WORLD"].set("total_deaths", 0)
totals_by_country["WORLD"].tag("total_world")
totals_by_country["WORLD"].tag("covid_time_series")
totals_by_country["WORLD"].set("timestamp", current_date.iso8601(3).to_s)
totals_by_country["WORLD"].set("unique_id", "totals_for_world_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
all_country_region = Array.new
all_province_state = Array.new
all_county = Array.new
all_country_region.append("WORLD")
events_array = Array.new
#For each event..
csv.each do |row|
#..create a new Logstash event that will store the data
new_event = LogStash::Event.new
#..add current row data to the current event
row.each do |k, v|
new_event.set(k, v&.strip)
end
#..store the date of the reporting
new_event.set("timestamp", current_date.iso8601(3).to_s)
#..store the date when Logstash processed the event
new_event.set("last_process", Time.now.iso8601(3).to_s)
#..some early events contain empty fields when the reported numbers are 0. Fix this here
new_event.set("confirmed", 0) if new_event.get("confirmed").nil?
new_event.set("recovered", 0) if new_event.get("recovered").nil?
new_event.set("deaths", 0) if new_event.get("deaths").nil?
#..calculate the number of active cases, if the field is not already populated
active = new_event.get("confirmed").to_i - new_event.get("recovered").to_i - new_event.get("deaths").to_i
new_event.set("active", active) if active >= 0
#..calculate ratios and store them
ratio_deaths_to_confirmed = new_event.get("deaths").to_f / new_event.get("confirmed").to_f
ratio_recovered_to_confirmed = new_event.get("recovered").to_f / new_event.get("confirmed").to_f
ratio_deaths_to_recovered = new_event.get("deaths").to_f / new_event.get("recovered").to_f
new_event.set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
new_event.set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
new_event.set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
if new_event.get("province_state")&.match?(/(.*?),\s(\w{2})/)
matched = new_event.get("province_state").match(/(.*?),\s(\w{2})/)
@states[matched[2]].nil? ? new_event.set("province_state", matched[2]) : new_event.set("province_state", @states[matched[2]])
new_event.set("admin2", matched[1].gsub(/ [Cc]ounty/, ""))
new_event.tag("added_county")
end
#..generate a unique key for the current event
key = ((new_event.get("admin2") || "").downcase + "_" + (new_event.get("province_state") || "").downcase + "_" + (new_event.get("country_region") || "").downcase + "_" + (current_date.strftime("%m-%d-%Y") || "").to_s).gsub(/[^\w\d]/, "_")
new_event.set("unique_id", key)
#..add a tag to identify the event later
new_event.tag("covid_time_series")
events_array.append(new_event)
if ["Channel Islands", "Cayman Islands"].include?(new_event.get("country_region"))
new_event.set("province_state", new_event.get("country_region"))
new_event.set("country_region", "United Kingdom")
end
if ["Saint Martin", "St. Martin"].include?(new_event.get("country_region"))
new_event.set("province_state", "St Martin")
new_event.set("country_region", "France")
end
country_normalization = {
"Mainland China" => "China",
"South Korea" => "Korea, South",
"Republic of Korea" => "Korea, South",
"Iran \(Islamic Republic of\)" => "Iran",
"Bahamas, The" => "Bahamas",
"Gambia, The" => "Gambia",
"Hong Kong SAR" => "China",
"Hong Kong" => "China",
"Macao SAR" => "China",
"Macau" => "China",
"Republic of Ireland" => "Ireland",
"Republic of Moldova" => "Moldova",
"Republic of the Congo" => "Congo (Brazzaville)",
"Russian Federation" => "Russia",
"Taipei and environs" => "Taiwan",
"Taiwan\*" => "Taiwan",
"UK" => "United Kingdom",
"Vatican City" => "Holy See",
"Viet Nam" => "Vietnam",
"occupied Palestinian territory" => "West Bank and Gaza",
"Ivory Coast" => "Cote d\'Ivoire",
"East Timor" => "Timor-Leste",
"Czech Republic" => "Czechia",
"The Bahamas" => "Bahamas",
"The Gambia" => "Gambia",
"Cape Verde" => "Cabo Verde",
"North Ireland" => "Ireland",
"Palestine" => "West Bank and Gaza",
}
new_event.set("country_region", new_event.get("country_region").gsub(/.*/, country_normalization)) unless country_normalization[new_event.get("country_region")].nil?
add_us_metadata(us_csv, new_event)
if totals_by_country[new_event.get("country_region")].nil?
totals_by_country[new_event.get("country_region")] = LogStash::Event.new
totals_by_country[new_event.get("country_region")].set("confirmed", 0)
totals_by_country[new_event.get("country_region")].set("recovered", 0)
totals_by_country[new_event.get("country_region")].set("deaths", 0)
totals_by_country[new_event.get("country_region")].set("country_region", new_event.get("country_region"))
totals_by_country[new_event.get("country_region")].set("timestamp", new_event.get("timestamp"))
totals_by_country[new_event.get("country_region")].tag("total_by_country_region")
totals_by_country[new_event.get("country_region")].tag("covid_time_series")
totals_by_country[new_event.get("country_region")].set("unique_id", "totals_for_" + new_event.get("country_region").downcase + "_" + current_date.strftime("%m-%d-%Y").to_s.gsub(/[^\w\d]/, "_"))
end
totals_by_country[new_event.get("country_region")].set("confirmed", new_event.get("confirmed").to_i + totals_by_country[new_event.get("country_region")].get("confirmed").to_i)
totals_by_country[new_event.get("country_region")].set("deaths", new_event.get("deaths").to_i + totals_by_country[new_event.get("country_region")].get("deaths").to_i)
totals_by_country[new_event.get("country_region")].set("recovered", new_event.get("recovered").to_i + totals_by_country[new_event.get("country_region")].get("recovered").to_i)
ratio_deaths_to_confirmed = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
ratio_recovered_to_confirmed = totals_by_country[new_event.get("country_region")].get("recovered").to_f / totals_by_country[new_event.get("country_region")].get("confirmed").to_f
ratio_deaths_to_recovered = totals_by_country[new_event.get("country_region")].get("deaths").to_f / totals_by_country[new_event.get("country_region")].get("recovered").to_f
totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
totals_by_country[new_event.get("country_region")].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
totals_by_country[new_event.get("country_region")].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
totals_by_country["WORLD"].set("total_confirmed", totals_by_country["WORLD"].get("total_confirmed").to_i + new_event.get("confirmed").to_i)
totals_by_country["WORLD"].set("total_recovered", totals_by_country["WORLD"].get("total_recovered").to_i + new_event.get("recovered").to_i)
totals_by_country["WORLD"].set("total_deaths", totals_by_country["WORLD"].get("total_deaths").to_i + new_event.get("deaths").to_i)
all_country_region.append(new_event.get("country_region")) if !new_event.get("country_region").nil? and !all_country_region.include?(new_event.get("country_region"))
all_province_state.append(new_event.get("province_state")) if !new_event.get("province_state").nil? and !all_province_state.include?(new_event.get("province_state"))
all_county.append(new_event.get("admin2")) if !new_event.get("admin2").nil? and !all_county.include?(new_event.get("admin2"))
end
ratio_deaths_to_confirmed = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
ratio_recovered_to_confirmed = totals_by_country["WORLD"].get("total_recovered").to_f / totals_by_country["WORLD"].get("total_confirmed").to_f
ratio_deaths_to_recovered = totals_by_country["WORLD"].get("total_deaths").to_f / totals_by_country["WORLD"].get("total_recovered").to_f
totals_by_country["WORLD"].set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed) if ratio_deaths_to_confirmed.finite?
totals_by_country["WORLD"].set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed) if ratio_recovered_to_confirmed.finite?
totals_by_country["WORLD"].set("ratio_deaths_to_recovered", ratio_deaths_to_recovered) if ratio_deaths_to_recovered.finite?
totals_by_country["WORLD"].set("country_region", all_country_region)
totals_by_country["WORLD"].set("province_state", all_province_state)
totals_by_country["WORLD"].set("admin2", all_county)
totals_by_country.each do |k, v|
events_array.append(v)
end
missing_events = add_us_missing_states(us_csv, all_province_state, current_date)
missing_events.each do |event|
events_array.append(event)
end
return events_array
end
def add_us_metadata(us_csv, event)
if event.get("country_region") == "US"
if !us_csv.nil? and event.get("admin2") == "Unassigned"
us_csv.each do |row|
if row["province_state"] == event.get("province_state")
event.set("people_tested", row["people_tested"])
event.set("people_hospitalized", row["people_hospitalized"])
event.set("people_tested", 0) if row["people_tested"].nil?
event.set("people_hospitalized", 0) if row["people_hospitalized"].nil?
end
end
end
end
end
# The global data set does not contain all US states.
# So we also need to treat this case when a new region is found in the US dataset
def add_us_missing_states(us_csv, all_province_state, current_date)
events_array = Array.new
if us_csv.nil?
return events_array
end
states = us_csv["province_state"]
states.each do |state|
if !all_province_state.include? state and state != "Recovered"
new_event = LogStash::Event.new
us_csv.each do |row|
if row["province_state"] == state
row.each do |k,v|
new_event.set(k, v&.strip)
end
new_event.set("people_tested", 0) if row["people_tested"].nil?
new_event.set("people_hospitalized", 0) if row["people_hospitalized"].nil?
new_event.set("confirmed", 0) if row["confirmed"].nil?
new_event.set("recovered", 0) if row["recovered"].nil?
new_event.set("deaths", 0) if row["deaths"].nil?
new_event.set("active", 0) if row["active"].nil?
new_event.set("admin2", "Unassigned")
new_event.set("timestamp", current_date.iso8601(3).to_s)
new_event.set("last_process", Time.now.iso8601(3).to_s)
new_event.set("last_update", Time.now.iso8601) if new_event.get("last_update").nil?
unique_id = ((new_event.get("admin2") || "").downcase + "_" + (new_event.get("province_state") || "").downcase + "_" + (new_event.get("country_region") || "").downcase + "_" + (current_date.strftime("%m-%d-%Y") || "").to_s).gsub(/[^\w\d]/, "_")
new_event.set("unique_id", unique_id)
new_event.tag("covid_time_series")
events_array.append(new_event)
end
end
end
end
return events_array
end
'
code => '
#First day in the dataset
github_daily_reports_first_day = Time.gm(2020, 01, 22, 12, 00)
# US fir day in the dataset
github_daily_reports_us_first_day = Time.gm(2020, 04, 12, 12, 00)
#Current day
github_daily_reports_last_day = Time.now
current_date = github_daily_reports_first_day
#If the CSV hashes are stored in a file, parse them..
if File.file?(@github_stored_hashes_path)
stored_hashes = JSON.parse(File.read(@github_stored_hashes_path))
#..else, create a new hash
else
stored_hashes = Hash.new
end
#For each day since the beggining of the dataset, until the current day..
while current_date <= github_daily_reports_last_day
begin
#..generate the URL based on the current_day
current_url = @github_daily_reports_base_url + current_date.strftime("%m-%d-%Y") + ".csv"
#..parse the CSV while normalizing the headers
#(e.g. Until 21.03.2020, the country is stored under "Country/Region". After this date, it is "Country_Region"
#We normalize this such that both variations are ultimately stored under "country_region". This applies to other fields as well)
current_csv = CSV.parse(open(current_url).read.gsub("\uFEFF", ""), headers: true, header_converters: lambda { |h| h.downcase.gsub(/[^\w]/, "_") })
#..generate the CSV hash
current_md5 = Digest::SHA256.hexdigest(current_csv.to_s)
us_current_csv = nil
if current_date >= github_daily_reports_us_first_day
# Generate URL based on the current_day for US reports
us_current_url = @github_daily_reports_us_base_url + current_date.strftime("%m-%d-%Y") + ".csv"
# Read the US csv
us_current_csv = CSV.parse(open(us_current_url).read.gsub("\uFEFF", ""), headers: true, header_converters: lambda { |h| h.downcase.gsub(/[^\w]/, "_") })
# Generate the CSV hash
us_current_md5 = Digest::SHA256.hexdigest(us_current_csv.to_s)
end
rescue
#The pipeline will usually fail to fetch data for the current day, if it was not already published
logger.warn? and logger.warn("current_url" + current_url)
logger.warn? and logger.warn("Failed to fetch CSV for date " + current_date.strftime("%m-%d-%Y"))
else
#..if the hash for the current CSV is different than the stored one, process the events
if stored_hashes[current_url] != current_md5 || stored_hashes[us_current_url] != us_current_md5
events_array = create_events(current_csv, us_current_csv, current_date)
#..store the new hashes
stored_hashes[current_url] = current_md5
stored_hashes[us_current_url] = us_current_md5
#..and send each event back through the pipeline
events_array.each do |event|
new_event_block.call(event)
end
end
end
#Check the next day
current_date += 86400
end
#Attempt to write the new CSV hashes to the disk
begin
File.write(@github_stored_hashes_path, stored_hashes.to_json)
logger.info? and logger.info("Succesfuly updated cache at #{@github_stored_hashes_path}")
rescue StandardError => ex
logger.error? and logger.error("(#{ex.class} - #{ex.message}) Failed to write hashes to file #{@github_stored_hashes_path}. Does Logstash have read/write access to this location?")
ensure
#..and cancel this event. We are only interested in the time series data, which was already sent back through the pipeline.
event.cancel
end
'
}#end ruby
}#end if
#Each time series data event will be sent back through the pipeline.
#This 'if' discriminates between the original event that triggered the downloading and processing of the CSV, and the time series data
if "covid_time_series" in [tags] {
#Parse date fields
date {
#When the github dataset was last updated by the maintainers
match => [ "last_update", "yyyy-MM-dd HH:mm:ss", "M/d/yy HH:mm", "M/d/yyyy HH:mm", "ISO8601" ]
target => "last_update"
}#end date
date {
#When the event was processed by Logstash
match => ["last_process", "ISO8601" ]
target => "last_process"
}#end date
date {
#The true date of the report
match => [ "timestamp", "ISO8601" ]
target => "@timestamp"
}#end date
if "total_by_country_region" not in [tags] and "total_world" not in [tags] {
mutate {
#Normalize the location field
rename => {
"lat" => "[location][lat]"
"latitude" => "[location][lat]"
"lon" => "[location][lon]"
"long" => "[location][lon]"
"long_" => "[location][lon]"
"longitude" => "[location][lon]"
}
}
ruby {
code => '
@state_coordinates = { "Wisconsin" => { "lat" => "44.5", "lon" => "-89.5" }, "West Virginia" => { "lat" => "39", "lon" => "-80.5" }, "Vermont" => { "lat" => "44", "lon" => "-72.699997" }, "Texas" => { "lat" => "31", "lon" => "-100" }, "South Dakota" => { "lat" => "44.5", "lon" => "-100" }, "Rhode Island" => { "lat" => "41.700001", "lon" => "-71.5" }, "Oregon" => { "lat" => "44", "lon" => "-120.5" }, "New York" => { "lat" => "43", "lon" => "-75" }, "New Hampshire" => { "lat" => "44", "lon" => "-71.5" }, "Nebraska" => { "lat" => "41.5", "lon" => "-100" }, "Kansas" => { "lat" => "38.5", "lon" => "-98" }, "Mississippi" => { "lat" => "33", "lon" => "-90" }, "Illinois" => { "lat" => "40", "lon" => "-89" }, "Delaware" => { "lat" => "39", "lon" => "-75.5" }, "Connecticut" => { "lat" => "41.599998", "lon" => "-72.699997" }, "Arkansas" => { "lat" => "34.799999", "lon" => "-92.199997" }, "Indiana" => { "lat" => "40.273502", "lon" => "-86.126976" }, "Missouri" => { "lat" => "38.573936", "lon" => "-92.60376" }, "Florida" => { "lat" => "27.994402", "lon" => "-81.760254" }, "Nevada" => { "lat" => "39.876019", "lon" => "-117.224121" }, "Maine" => { "lat" => "45.367584", "lon" => "-68.972168" }, "Michigan" => { "lat" => "44.182205", "lon" => "-84.506836" }, "Georgia" => { "lat" => "33.247875", "lon" => "-83.441162" }, "Hawaii" => { "lat" => "19.741755", "lon" => "-155.844437" }, "Alaska" => { "lat" => "66.160507", "lon" => "-153.369141" }, "Tennessee" => { "lat" => "35.860119", "lon" => "-86.660156" }, "Virginia" => { "lat" => "37.926868", "lon" => "-78.024902" }, "New Jersey" => { "lat" => "39.833851", "lon" => "-74.871826" }, "Kentucky" => { "lat" => "37.839333", "lon" => "-84.27002" }, "North Dakota" => { "lat" => "47.650589", "lon" => "-100.437012" }, "Minnesota" => { "lat" => "46.39241", "lon" => "-94.63623" }, "Oklahoma" => { "lat" => "36.084621", "lon" => "-96.921387" }, "Montana" => { "lat" => "46.96526", "lon" => "-109.533691" }, "Washington" => { "lat" => "47.751076", "lon" => "-120.740135" }, "Utah" => { "lat" => "39.41922", "lon" => "-111.950684" }, "Colorado" => { "lat" => "39.113014", "lon" => "-105.358887" }, "Ohio" => { "lat" => "40.367474", "lon" => "-82.996216" }, "Alabama" => { "lat" => "32.31823", "lon" => "-86.902298" }, "Iowa" => { "lat" => "42.032974", "lon" => "-93.581543" }, "New Mexico" => { "lat" => "34.307144", "lon" => "-106.018066" }, "South Carolina" => { "lat" => "33.836082", "lon" => "-81.163727" }, "Pennsylvania" => { "lat" => "41.203323", "lon" => "-77.194527" }, "Arizona" => { "lat" => "34.048927", "lon" => "-111.093735" }, "Maryland" => { "lat" => "39.045753", "lon" => "-76.641273" }, "Massachusetts" => { "lat" => "42.407211", "lon" => "-71.382439" }, "California" => { "lat" => "36.778259", "lon" => "-119.417931" }, "Idaho" => { "lat" => "44.068203", "lon" => "-114.742043" }, "Wyoming" => { "lat" => "43.07597", "lon" => "-107.290283" }, "North Carolina" => { "lat" => "35.782169", "lon" => "-80.793457" }, "Louisiana" => { "lat" => "30.39183", "lon" => "-92.329102" } }
if (event.get("[location][lat]").nil? || event.get("[location][lon]").nil?) || (event.get("[location][lat]").to_f == 0 && event.get("[location][lon]").to_f == 0)
if (event.get("[country_region]") == "US") and (!@state_coordinates[event.get("[province_state]")].nil?)
event.set("[location][lat]", @state_coordinates[event.get("province_state")]["lat"])
event.set("[location][lon]", @state_coordinates[event.get("province_state")]["lon"])
else
event.set("[location][lat]", 0)
event.set("[location][lon]", 0)
end
end
'
}
}
#Rename the fields if you want to correlate with other datasets
mutate {
rename => {
"province_state" => "province_state"
"country_region" => "country_region"
"admin2" => "county"
"fips" => "fips"
"confirmed" => "confirmed"
"deaths" => "deaths"
"recovered" => "recovered"
"active" => "active"
"[location][lat]" => "[location][lat]"
"[location][lon]" => "[location][lon]"
"last_update" => "last_update"
"last_process" => "last_process"
"combined_key" => "combined_key"
"ratio_deaths_to_confirmed" => "ratio_deaths_to_confirmed"
"ratio_recovered_to_confirmed" => "ratio_recovered_to_confirmed"
"ratio_deaths_to_recovered" => "ratio_deaths_to_recovered"
"unique_id" => "unique_id"
"people_tested" => "tested"
"people_hospitalized" => "hospitalized"
}
}#end mutate
}#end if
}#end filter
output {
#Send the data to Elasticsearch
elasticsearch {
#Add your Elasticsearch hosts
hosts => ["<ES_HOST>"]
#Add the index name
index => "covid-19-live-update"
#Add Elasticsearch credentials
user => "<ES_USER>"
password => "<ES_PASSWORD>"
#Add SSL details
#ssl => true
#cacert => "/path/to/cert/"
#Update documents based on the unique id that we defined
action => "update"
document_id => "%{unique_id}"
#..or create a new document if it doesn't already exist
doc_as_upsert => true
manage_template => false
}
}