Search code examples
logstashelastic-stacklogstash-configuration

Using Logstash Aggregate Filter plugin to process data which may or may not be sequenced


Hello all!

I am trying to use the Aggregate filter plugin of Logstash v7.7 to correlate and combine data from two different CSV file inputs which represent API data calls. The idea is to produce a record showing a combined picture. As you can expect the data may or may not arrive in the right sequence.

Here is as an example:

/data/incoming/source_1/*.csv

StartTime, AckTime, Operation, RefData1, RefData2, OpSpecificData1 231313232,44343545,Register,ref-data-1a,ref-data-2a,op-specific-data-1 979898999,75758383,Register,ref-data-1b,ref-data-2b,op-specific-data-2 354656466,98554321,Cancel,ref-data-1c,ref-data-2c,op-specific-data-2

/data/incoming/source_1/*.csv

FinishTime,Operation,RefData1, RefData2, FinishSpecificData 67657657575,Cancel,ref-data-1c,ref-data-2c,FinishSpecific-Data-1 68445590877,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2 55443444313,Register,ref-data-1a,ref-data-2a,FinishSpecific-Data-2

I have a single pipeline that is receiving both these CSVs and I am able to process and write them as individual records to a single Index. However, the idea is to combine records from the two sources into one record each representing a superset. of Operation related information

Unfortunately, despite several attempts I have been unable to figure out how to achieve this via Aggregate filter plugin. My primary question is whether this is a suitable use of the specific plugin? And if so, any suggestions would be welcome!

At the moment, I have this

input {
   file {
      path => ['/data/incoming/source_1/*.csv']
      tags => ["source1"]
   }
   file {
      path => ['/data/incoming/source_2/*.csv']
      tags => ["source2"]
   }
   # use the tags to do some source 1 and 2 related massaging, calculations, etc

   aggregate {
         task_id = "%{Operation}_%{RefData1}_%{RefData1}"
         code => "
             map['source_files'] ||= []
             map['source_files'] << {'source_file', event.get('path') }
         "
         push_map_as_event_on_timeout => true
         timeout => 600 #assuming this is the most far apart they will arrive         
   }
  ...
}
output {
    elastic { ...}
}

And other such variations. However, I keep getting individual records being written to the Index and am unable to get one combined. Yet again, as you can see from the data set there's no guarantee of the sequencing of records - so I am wondering if the filter is the right tool for the job, to begin with? :-\

Or is it just me not being able to use it right! ;-)

In either case, any inputs/ comments/ suggestions welcome. Thanks!

PS: This message is being cross-posted over from Elastic forums. I am providing a link there just in case some answers pop up there too.


Solution

  • The answer is to use Elastic search in upsert mode. Please see the specifics here..