Search code examples
logstashelastic-stacklogstash-configuration

Logstash does not add the tags


I have a strange problem with logstash (v8.3.3) not applying the tags correctly.

There are currently only two configuration files in /conf.d/, one for beats and one for syslog

01-beats.conf

input {
     beats {
       port => 5044
       ssl => false
       tags => [ 'beat' ]
      }
    }
    

filter {
if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGLINE}" }
    }

    date {
match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
}

output {
if 'beat' in [tags] {
   elasticsearch {
     hosts => localhost
     index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
   }
}
stdout {
  codec => rubydebug
  }
}

02-syslog.conf

input {
  tcp {
    port => 514
    type => syslog
    tags => [ 'syslog' ]
  }
  udp {
    port => 514
    type => syslog
    tags => [ 'syslog' ]

  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
  if [message] =~ /(?i)crc/ {
     mutate { add_tag => ["switch"] }
  }
}

output {
if 'syslog' in [tags] {
   elasticsearch {
      hosts => ["localhost:9200"]
      index => "syslog-%{+YYYY.MM}"
   }
}
stdout {
    codec => rubydebug
    }

The problem arises in the 02-syslog.conf file, if I run logstash via: /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/02-syslog.conf the switch tag is successfully added.

Example:

root@elk-test ~ $ logger -n localhost -T -P 514 "crc error on port 48"


{
          "type" => "syslog",
          "tags" => [
        [0] "syslog",
        [1] "_grokparsefailure",
        [2] "switch"
    ],
    "@timestamp" => 2022-07-31T17:42:36.408104Z,
      "@version" => "1",
         "event" => {
        "original" => "<13>1 2022-07-31T19:42:36.314411+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"88217\"] crc error on port 48"
    },
       "message" => "<13>1 2022-07-31T19:42:36.314411+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"88217\"] crc error on port 48"
}

When, instead, I start the service via: systemctl start logstash.service the tag is not added.

Example:

root@elk-test ~ $ logger -n localhost -T -P 514 "crc error on port 48"
    
    {
      "_index": "syslog-2022.07",
      "_id": "mcJQVYIBNEboR1irs878",
      "_version": 1,
      "_score": 0,
      "_source": {
        "@timestamp": "2022-07-31T17:32:08.058612Z",
        "message": [
          "<13>1 2022-07-31T19:32:07.935622+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48",
          "root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48"
        ],
        "timestamp": "2022-07-31T19:32:07.935622+02:00",
        "tags": [
          "syslog",
          "_dateparsefailure",
          "_grokparsefailure"
        ],
        "host": {
          "hostname": "srv-elk-hck.localdomain"
        },
        "@version": "1",
        "type": "syslog",
        "event": {
          "original": "<13>1 2022-07-31T19:32:07.935622+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48"
        }
      },
      "fields": {
        "event.original": [
          "<13>1 2022-07-31T19:32:07.935622+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48"
        ],
        "tags.keyword": [
          "syslog",
          "_dateparsefailure",
          "_grokparsefailure"
        ],
        "@version.keyword": [
          "1"
        ],
        "host.hostname": [
          "srv-elk-hck.localdomain"
        ],
        "type": [
          "syslog"
        ],
        "message": [
          "<13>1 2022-07-31T19:32:07.935622+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48",
          "root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48"
        ],
        "tags": [
          "syslog",
          "_dateparsefailure",
          "_grokparsefailure"
        ],
        "@timestamp": [
          "2022-07-31T17:32:08.058Z"
        ],
        "type.keyword": [
          "syslog"
        ],
        "message.keyword": [
          "<13>1 2022-07-31T19:32:07.935622+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48",
          "root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48"
        ],
        "event.original.keyword": [
          "<13>1 2022-07-31T19:32:07.935622+02:00 srv-elk-hck.localdomain root - - [timeQuality tzKnown=\"1\" isSynced=\"1\" syncAccuracy=\"15780\"] crc error on port 48"
        ],
        "host.hostname.keyword": [
          "srv-elk-hck.localdomain"
        ],
        "@version": [
          "1"
        ],
        "timestamp": [
          "2022-07-31T17:32:07.935Z"
        ]
      }
    }

What am I missing ?


Solution

  • The service must be using a different configuration. In the events you show [message] is an array, which suggests that the grok pattern ends with %{GREEDYDATA:message} (message, not syslog_message). If you fix that your tag will get added.

    Otherwise, try

    if [message][0] =~ /(?i)crc/ { mutate { add_tag => ["switch"] } }