Search code examples
logginglogstashlogstash-grok

Split a log entry in two events with logstash-grok, clone and mutate


I'm trying to split a log in two events with grok, mutate and clone in my logstash configuration. My stack is a very standard ELK (Elasticsearch, Logstash, Kibana).

I'm creating a log that is formatted like that :

timestamp float integer

For instance :

2015/01/19 21:48:12 24.7 32
2015/01/19 22:00:20 24.7 32
2015/01/19 22:01:11 24.7 32
2015/01/19 22:01:58 24.7 28
2015/01/19 22:02:28 23.7 28
(etc ...)

Eventually, I want two events in logstash, one with the keys timestamp obviously, type=sensorA and value=%{the value of the integer} and another with the same timestamp and value=%{value of the float} and type=sensorB.

So far, I've come with this configuration in logstash.conf:

1.My log is tagged with type=sensor in my inputs :

  input {
    file {
      path => "/var/log/sensors.log"
      type => "sensor"
    }
  }

2.Then, I use grok, clone and mutate to try to split them

if [type] == "sensor" {
  # Extracts the values
  grok { 
    match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
  }
  mutate {
    update => [ "type", "sensorA" ]
  }
  # Clones the event
  clone {
    clones => ["sensorB"]
  }
}
# So now I should have two events, 
# one with type sensorA and one with type sensorB, no ? :
if [type] == "sensorA" {
  mutate {
    add_field => { "value" => "%{sensorA}" }
    convert => ["value", "float"]
  }
}
if [type] == "sensorB" {
  mutate {
    add_field => { "value" => "%{sensorB}" }
    convert => ["value", "integer"]
  }
}

But that doesn't really work, since eventhough I get two events with a different type, they both have the same value (which is that of sensorB, always).

How come ? I got a feeling the logstash.conf file is not really read in a linear fashion but I can't find any solution for this.

Any hint? Am I missing something really obvious here ? Thanks a lot


Solution

  • Instead of using mutate to convert the value to value field, you can use ruby plugin to do what you need.

    if [type] == "sensor" {
        # Extracts the values
        grok {
            match => { "message" => "(?<timestamp>%{YEAR}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME}) %{NUMBER:sensorA:float} %{NUMBER:sensorB:int}" }
        }
        mutate {
            update => [ "type", "sensorA" ]
        }
        # Clones the event
        clone {
            clones => ["sensorB"]
        }
    }
    # So now I should have two events,
    # one with type sensorA and one with type sensorB, no ? :
    ruby {
        code => "
            if event['type'] == 'sensorA'
                    event['value'] = event['sensorA']
            elsif event['type'] == 'sensorB'
                    event['value'] = event['sensorB']
            end
        "
    }
    

    With this config, I can meet your requirement. Hope this can help you :)