Search code examples
logstashelastic-stackelk

how to differentiate Normal logs and auditing logs in logstash filter, if the input is same?


I have an application running in Kubernetes. And I have two types of logs,

  1. Normal logs
  2. auditing logs

For example,

Following is the example of a single log file which has both kind of data, normal logs and auditing

[INFO] [] [%{TraceId}] 2021-10-02_18:37:01.601 com.MyExample.Test Getting app config from Config server with Base URL BASE_URL for application.
[INFO] [] [%{TraceId}] 2021-10-02_18:25:11.807 Audit Hello world,test file,123454,123Addb,true

In the above example, the first line is a normal log and the second log is audit data.

I want both of these logs to go to different indices (2 different index) and I want to apply a different kind of logstash filter for both the logs.

Any suggestion on how I can solve the above problem?

Also adding a flow diagram of requirement: enter image description here

My current logstash.conf file serves only auditing purposes and I am using a .csv file

input {
  file {
    path => "<path_to_dir>/audit-*.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
      separator => ","
      skip_header => "false"
      columns => ["TraceId","timestamp","fcId","Ean","source","Quantity","updateSent","message"]
  }
}
output {
   elasticsearch {
     hosts => "http://localhost:9200"
     index => "audit"
  }
stdout {}
} 

Thanks in advance!


Solution

  • Okay, this is not very clean, but right now I don't see other way to do it if there is no label saying audit or log.

    You can try to match JAVACLASS, which is part of logstash core, with grok, probably you are about to use grok anyways later to analyze your log. Grok can tag events with _grokparsefailure in case there is no match in audit logs and from what you said, there shouldn't be. After grok just use if and check if the tag is present, if it is, then it's audit log, otherwise it's basic log and then process your events as you please.

    For JAVACLASS pattern to work inside of grok, you will need to set ecs_compatibility to v1 as explained here.

    Disclaimer, this is idea based on only 2 examples you provided, this could be completely wrong, in that case please provide more examples, because without clear pattern it's impossible to come up with the solution.

    Concrete example:

    Log line

    [INFO] [] [123trace456] 2021-10-02_18:37:01.601 com.MyExample.Test Getting app config from Config server with Base URL BASE_URL for application.
    

    Grok pattern

    \[%{LOGLEVEL:loglevel}\] \[\] \[(?<traceId>[0-9a-z]*)\] (?<timestamp>[0-9\-_:\.]*) %{JAVACLASS:class} %{GREEDYDATA:message}
    

    Result

    {
      "loglevel": [
        [
          "INFO"
        ]
      ],
      "traceId": [
        [
          "123trace456"
        ]
      ],
      "timestamp": [
        [
          "2021-10-02_18:37:01.601"
        ]
      ],
      "class": [
        [
          "com.MyExample.Test"
        ]
      ],
      "message": [
        [
          "Getting app config from Config server with Base URL BASE_URL for application."
        ]
      ]
    }
    

    I have no idea what kind of timestamp that is and how your trace ID looks like so I just did whatever, the point is the usage of JAVACLASS pattern. I found plenty of examples by googling, please don't forget, google is your friend!