I am working in the Elapsed filter. I read the guide of Elapsed filter in logstash. then i made a sample config file and csv to test the working of Elapsed filter. But it seems to be not working. There is no change in uploading the data to ES. i have attached the csv file and config code. Can you give some examples for how to use the elapsed filter.
here's my config file:
input {
file {
path => "/home/paulsteven/log_cars/aggreagate.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads","num_id","serial"]
elapsed {
start_tag => "taskStarted"
end_tag => "taskEnded"
unique_id_field => "num_id"
output {
elasticsearch {
hosts => "localhost:9200"
index => "el03"
document_type => "details"
Output in ES:
"city" => "tirunelveli",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "tamil nadu,tirunelveli,hap0,ad1,2345-1002-4501,1",
"@version" => "1",
"serial" => "1",
"haps" => "hap0",
"state" => "tamil nadu",
"host" => "smackcoders",
"ads" => "ad1",
"@timestamp" => 2019-05-06T10:03:51.443Z
"city" => "chennai",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "tamil nadu,chennai,hap0,ad1,2345-1002-4501,5",
"@version" => "1",
"serial" => "5",
"haps" => "hap0",
"state" => "tamil nadu",
"host" => "smackcoders",
"ads" => "ad1",
"@timestamp" => 2019-05-06T10:03:51.447Z
"city" => "kottayam",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "kerala,kottayam,hap1,ad2,2345-1002-4501,9",
"@version" => "1",
"serial" => "9",
"haps" => "hap1",
"state" => "kerala",
"host" => "smackcoders",
"ads" => "ad2",
"@timestamp" => 2019-05-06T10:03:51.449Z
"city" => "Jalna",
"path" => "/home/paulsteven/log_cars/aggreagate.csv",
"num_id" => "2345-1002-4501",
"message" => "mumbai,Jalna,hap2,ad3,2345-1002-4501,13",
"@version" => "1",
"serial" => "13",
"haps" => "hap2",
"state" => "mumbai",
"host" => "smackcoders",
"ads" => "ad3",
"@timestamp" => 2019-05-06T10:03:51.452Z
You have to tag your events in order Logstash could find the start / end tags. Basically you have to know when an event is considered a start event and when it's an end event.
Elapsed filter plugin works only for two events (for example a request event and a response event in order to get the latency between them) Both these two kinds of event need to own an ID field which identify uniquely that particular task. The name of this field is stored in unique_id_field.
For your example you have to identify a pattern for start and end event, let's say that you have in your csv a column type (see the code below) when type contains "START", the line is considered start event and if it contains "END" it's an end event, pretty straightforward, and a columnn id that stores the unique identifier.
filter {
csv {
separator => ","
quote_char => "%"
columns => ["state","city","haps","ads","num_id","serial", "type", "id"]
grok {
match => { "type" => ".*START.*" }
add_tag => [ "taskStarted" ]
}grok {
match => { "type" => ".*END*" }
add_tag => [ "taskTerminated" ]
} elapsed {
start_tag => "taskStarted"
end_tag => "taskTerminated"
unique_id_field => "id"
I feel like your need is different. If you want to aggregate more than two events, all the events with the same value for column state for example, please check out this plugin