forked from siscale/covid-19-elk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogstash-github-covid-19-time-series-template.conf
192 lines (174 loc) · 9.28 KB
/
logstash-github-covid-19-time-series-template.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
input {
#Use the generator input to update the dataset when the pipeline is first started
generator {
lines => [ "first-time-run" ]
count => 1
tags => "check_github"
}
#The http_poller input plugin is used to schedule checks for dataset updates
#The "schedule" setting is defined to check for updates every new hour (at minute 0)
http_poller {
urls => {
check_github => "https://github.com/CSSEGISandData/COVID-19/tree/master/csse_covid_19_data/csse_covid_19_time_series"
}
tags => "check_github"
request_timeout => 60
schedule => { "cron" => "0 * * * * UTC" }
codec => "plain"
metadata_target => "http_poller_metadata"
}
}
filter {
#The pipeline will treat two types of events:
#The first type is the initial event that triggers the downloading, parsing, and transforming of the CSVs into time series data
#The second type is the time series data itself
#This 'if' discriminates between the two types. The time series data is treated later
if "check_github" in [tags] {
ruby {
init => '
require "csv"
require "open-uri"
#Use a hashmap to store the time series data
@event_map = Hash.new
#Function used for extracting time series data from the CSV
#Arguments: The CSV, the type of the CSV (confirmed/deaths/recovered)
def parse_csv(csv, type)
csv.each do |csv_row|
#Drop the first four headers (Province/State, Country/Region, Lat, Lon) - we only want the date headers
csv.headers.drop(4).each do |date_header|
#Create a unique id from the Province/State, Country/Region, and the date
#This will be used for updating the ES index without duplicating data
key = ((csv_row[0]||"").downcase + "_" + (csv_row[1]||"").downcase + "_" + (date_header||"").to_s).gsub(/[^\w\d]/, "_")
#If the key is already used, then the event is already created
#E.g. if the "Confirmed cases" CSV has already been parsed, and Logstash is currently processing the "Deaths" CSV
#then the event will be updated with the number of deaths for that Province/Region/Date, based on the unique key generated above
if @event_map.key?(key)
@event_map[key].set(type, csv_row[date_header])
#..else, create a new Logstash event and add it to the Hashmap
else
@event_map[key] = LogStash::Event.new
@event_map[key].set("updated_at", Time.now)
@event_map[key].set("province_state", csv_row[0])
@event_map[key].set("country_region", csv_row[1])
@event_map[key].set("[location][lat]", csv_row[2])
@event_map[key].set("[location][lon]", csv_row[3])
@event_map[key].set("timestamp_from_header", date_header)
@event_map[key].set("unique_id", key)
@event_map[key].set(type, csv_row[date_header])
end
end
end
end
'
code => '
begin
#Download the CSV files and parse them as CSV objects
url_confirmed = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_confirmed_global.csv"
url_deaths = "https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_time_series/time_series_covid19_deaths_global.csv"
#url_recovered = ""
csv_confirmed = CSV.parse(open(url_confirmed).read, headers: true)
csv_deaths = CSV.parse(open(url_deaths).read, headers: true)
#csv_recovered = CSV.parse(open(url_recovered).read, headers: true)
#Parse the CSVs using the function defined above
parse_csv(csv_confirmed, "confirmed")
parse_csv(csv_deaths, "deaths")
#parse_csv(csv_recovered, "recovered")
#For each event in the hashmap:
#Calculate ratios, then use "new_event_block.call(event)" to create a new event.
#These new events represent the time series data extracted from the CSVs
@event_map.each do |key, event|
ratio_deaths_to_confirmed = event.get("deaths").to_f / event.get("confirmed").to_f
ratio_recovered_to_confirmed = event.get("recovered").to_f / event.get("confirmed").to_f
ratio_deaths_to_recovered = event.get("deaths").to_f / event.get("recovered").to_f
if ratio_deaths_to_confirmed.finite?
event.set("ratio_deaths_to_confirmed", ratio_deaths_to_confirmed)
else
event.set("ratio_deaths_to_confirmed", 0.0)
end
if ratio_recovered_to_confirmed.finite?
event.set("ratio_recovered_to_confirmed", ratio_recovered_to_confirmed)
else
event.set("ratio_recovered_to_confirmed", 0.0)
end
if ratio_deaths_to_recovered.finite?
event.set("ratio_deaths_to_recovered", ratio_deaths_to_recovered)
else
event.set("ratio_deaths_to_recovered", 0.0)
end
new_event_block.call(event)
end
#After all the parsing is done, cancel this event.
#We only need the time series data (that was already extracted and sent back through the pipeline)
#not the initial event itself.
event.cancel
rescue
#In case anything goes wrong, log an error
@logger.error? and @logger.error("Something went wrong while processing the CSV. Does Logstash have internet access? Are the URLs correct?")
event.cancel
end
'
}#end ruby
}#end if
#Each time series data event will be sent back through the pipeline.
#This 'if' discriminates between the original event that triggered the downloading and processing of the CSV, and the time series data
if [timestamp_from_header] {
#Transform the date extracted from the CSV into a timefield.
#By default, the parsed date will be stored in the '@timestamp' field
date {
match => [ "timestamp_from_header", "M/d/yy" ]
}#end date
#Extract county from the "province_state" field, where possible
if [province_state] =~ /(.*?),\s(\w{2})/ {
ruby {
code => '
matched = event.get("province_state").match(/(.*?),\s(\w{2})/)
event.set("province_state", matched[2])
event.set("county", matched[1])
event.tag("added_county")
'
}
}
#This is used to rename the fields if you want to correlate the data with other data sources.
#Modify the right-hand part of the hash
#E.g. modify '"confirmed" => "confirmed"' to '"confirmed" => "[cases][confirmed]"' to store the number of confirmed cases under the field 'cases.confirmed'
mutate {
rename => {
"confirmed" => "confirmed"
"country_region" => "country_region"
"county" => "county"
"deaths" => "deaths"
"[location][lat]" => "[location][lat]"
"[location][lon]" => "[location][lon]"
"province_state" => "province_state"
"ratio_deaths_to_confirmed" => "ratio_deaths_to_confirmed"
"ratio_deaths_to_recovered" => "ratio_deaths_to_recovered"
"ratio_recovered_to_confirmed" => "ratio_recovered_to_confirmed"
"recovered" => "recovered"
"timestamp_from_header" => "timestamp_from_header"
"updated_at" => "updated_at"
"unique_id" => "unique_id"
}
}#end mutate
}#end if
}#end filter
output {
#Send the data to Elasticsearch
elasticsearch {
#Add your Elasticsearch hosts
hosts => ["<ES_HOST>"]
#Add the index name
index => "covid-19-live-update"
#Add Elasticsearch credentials
user => "<ES_USER>"
password => "<ES_PASSWORD>"
#Add SSL details
#ssl => true
#cacert => "/path/to/cert/"
#Update documents based on the unique id that we defined
action => "update"
document_id => "%{unique_id}"
#..or create a new document if it doesn't already exist
doc_as_upsert => true
manage_template => false
}
}