Search code examples
elasticsearchlogstashlogstash-grokelk

Have a key value pair as logstash output, by only using grok filter


I am working on a spring boot project and using ELK stack for logging and auditing. I need a logstash.conf file which will process logs and the output can have dynamic key-value pairs. This output data will be used for auditing.

Adding an example for better clarity

Example:

Sample log:

[INFO] [3b1d04f219fc43d18ccb6cb22db6cff4] 2021-10-13_13:43:09.074 Audit_ key1:value1| key2:value2| key3:value3| keyN:valueN

Required logstash output:

{
  "logLevel": [
    [
      "INFO"
    ]
  ],
  "threadId": [
    [
      "3b1d04f219fc43d18ccb6cb22db6cff4"
    ]
  ],
  "timeStamp": [
    [
      "2021-10-13_13:43:09.074"
    ]
  ],
  "class": [
    [
      "Audit_"
    ]
  ],
  "key1": [
    [
      "value1"
    ]
  ],
  "key2": [
    [
      "value2"
    ]
  ],
  "key3": [
    [
      "value3"
    ]
  ],
  "keyN": [
    [
      "valueN"
    ]
  ]
}

Note:

  • "key" will always be a word or string value
  • "value" can be word, numeric or sentence(string with spaces)
  • ":" is the separator between key and value
  • "|" is the separator between key-value pairs
  • The number of key-value pairs can vary.

Can someone suggest/help me with the match pattern to be used here? I am only allowed to use grok filter.


Solution

  • Thank you for guidance Filip and leandrojmp!

    Just using a grok filter for this, would make it very complex and also it wont support dynamic key-value pairs.

    So I went with a combination of grok followed by kv filter. And this approach worked for me.

    Sample Log:

    [INFO] [3b1d04f219fc43d18ccb6cb22db6cff4] 2021-10-13_13:43:09.074 _Audit_ key1:value1| key2:value2| key3:value3| keyN:valueN
    

    logstash.conf file:

    input {
      beats {
            port => "5044"
        }
    }
    filter {
      grok {
      match => {"message" => "\[%{LOGLEVEL:logLevel}\]\ \[%{WORD:traceId}\]\ (?<timestamp>[0-9\-_:\.]*)\ %{WORD:class}\ %{GREEDYDATA:message}"]}
        overwrite => [ "message" ]
        }
        if [class] == "_Audit_" {
          kv {
            source => "message"
            field_split => "&"
            value_split => "="
            remove_field => ["message"]
          }
        }
    }
    output {
      if [class] == "_Audit_" {
        elasticsearch { 
            hosts => ["localhost:9200"] 
            index => "audit-logs-%{+YYYY.MM.dd}"
          }
      }
      else {
        elasticsearch { 
            hosts => ["localhost:9200"] 
            index => "normal-logs-%{+YYYY.MM.dd}"
          }
      }
    }