Search code examples
logstashlogstash-groklogstash-configuration

LogStash dissect with key=value, comma


I have a pattern of logs that contain performance&statistical data. I have configured LogStash to dissect this data as csv format in order to save the values to ES.

<1>,www1,3,BISTATS,SCAN,330,712.6,2035,17.3,221.4,656.3

I am using the following LogSTash filter and getting the desired results..

grok {
  match => { "Message" => "\A<%{POSINT:priority}>,%{DATA:pan_host},%{DATA:pan_serial_number},%{DATA:pan_type},%{GREEDYDATA:message}\z" }
  overwrite => [ "Message" ]
}
csv {
  separator => ","
  columns => ["pan_scan","pf01","pf02","pf03","kk04","uy05","xd06"]
}

This is currently working well for me as long as the order of the columns doesn't get messed up.

However I want to make this logfile more meaningful and have each column-name in the original log. example-- <1>,www1,30000,BISTATS,SCAN,pf01=330,pf02=712.6,pf03=2035,kk04=17.3,uy05=221.4,xd06=656.3

This way I can keep inserting or appending key/values in the middle of the process without corrupting the data. (Using LogStash5.3)


Solution

  • By using @baudsp recommendations, I was able to formulate the following. I deleted the csv{} block completely and replace it with the kv{} block. The kv{} automatically created all the key values leaving me to only mutate{} the fields into floats and integers.

     json {
      source => "message"
      remove_field => [ "message", "headers" ]
     }
     date {
       match => [ "timestamp", "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'" ]
       target => "timestamp"
     }
     grok {
       match => { "Message" => "\A<%{POSINT:priority}>,%{DATA:pan_host},%{DATA:pan_serial_number},%{DATA:pan_type},%{GREEDYDATA:message}\z" }
       overwrite => [ "Message" ]
     }
     kv {
      allow_duplicate_values => false
      field_split_pattern => ","
     }
    

    Using the above block, I was able to insert the K=V, pairs anywhere in the message. Thanks again for all the help. I have added a sample code block for anyone trying to accomplish this task.

    Note: I am using NLog for logging, which produces JSON outputs. From the C# code, the format looks like this.

    var logger = NLog.LogManager.GetCurrentClassLogger();
    logger.ExtendedInfo("<1>,www1,30000,BISTATS,SCAN,pf01=330,pf02=712.6,pf03=2035,kk04=17.3,uy05=221.4,xd06=656.3");