Ok, so, we have a logstash pipeline where we query an http poller hourly to get data from the last week.
Before we save data to ES index, we filter data which are previously on the index, dropping those events.
Finally, we store the new events.
Now we want the pipeline to create a new index yearly, so we can manage data in an orderly way, so we added an action => "create" and a template to our output, so the index is created if ii doesn't exists.
This is our pipeline:
input {
http_poller {
urls => {
air_quality => {
url => "https://url_to_data"
method => get
headers => {
Accept => "application/csv"
}
}
}
schedule => { every => "1h" }
codec => line { charset => "UTF-8" }
}
}
filter {
#If event starts by Station it's first line and we ignore it
if [message] =~ /^Station/ {
drop { }
}
#We tell fieldnames to get from CSV.
csv {
skip_header => "true"
columns => ["station", "title", "latitude", "longitude", "date", "period", "SO2", "NO", "NO2", "CO", "PM10", "O3", "dd", "vv", "TMP", "HR", "PRB", "RS", "LL", "BEN", "TOL", "MXIL", "PM25"]
separator => ","
}
#We assign format and timezone to date
date {
match => ["[date]", "yyyy-MM-dd"]
timezone => "Europe/Madrid"
target => "[date]"
}
#We scape ":" from timezone
mutate { add_field => { "eventDate" => "%{[fecha]}" } }
#And add the scaped character
mutate { gsub => [ "eventDate", ":", "\:" ] }
#We check if there is a repeated event on the index, so we drop it.
elasticsearch {
hosts => "https://our_host_url"
ca_file => "our_http_ca.crt"
api_key => "our_API_KEY"
index => "our-base-inex-name-%{+YYYY}"
query => "period:%{[period]} AND station:%{[station]} AND date:%{[eventDate]}"
}
#Drop repeated events
if [@metadata][total_hits] != 0 {
drop {}
}
#Rename latitude and longitude
mutate {
rename => {
"latitude" => "[situation][lat]"
"longitude" => "[situation][lon]"
}
}
}
output {
elasticsearch {
hosts => "https://our_host_url"
cacert => "our_http_ca.crt"
api_key => "our_API_KEY"
index => "our-base-index-name-%{+YYYY}"
action => "create"
template_name => "out-base-template-name"
}
}
So now we are receiving 404 warnings where our output query tries to check for previous data in a new index that is not created yet.
I supose that we should be able to create our index through the template previously to checking the query on the filter, but don't have a clue about how to do it.
However, what surprises me is that logs doesn't show ERRORs, it shows WARNINGs, if I comment the query part in the filter, the system works fine and creates the index, if I activate it again, the queries against a nonexitent index fail (with WARNINGS) and the index is never created. I suspect that this is because the WARNINGS avoid all events to fire, and as no event is being fired, output is never reached.
Any advice on this?
There is an option in the _search
API to not return an error, which is called ignore_unavailable
. The default value is false
, which means that the request returns an error if the queried index doesn't exist. However, the elasticsearch
filter plugin doesn't provide any way to set this parameter to true.
Instead of querying the exact index name, thanks to another parameter called allow_no_indices
which defaults to true, the trick would be to add a wildcard to the name, as in our-base-index-name-%{+YYYY}*
and that would have the effect of not erroring out if the index doesn't exist. Or just our-base-index-name-*
since your query already identifies the proper date range of the data.