Search code examples
logstash-groklogstash-configurationlogstash-file

How to Create Separate index created for separate input types


I have Below logstash-syslog.conf file where it has two different input types one as type => "syslog" and another is type => "APIC" . So, i need two separate output index created as syslog-2018.08.25 and APIC-2018.08.05 .

I want these index to be created Dynamically, i tried something index => "%{[type]}-%{+YYYY.MM.dd}" but it did not worked and killed the logstash.

Could you please suggest what's wrong i'm doing in the below config which needs to be fixed for both config and Index type.

Below is the configuration logstash file:

logstash Version is : 6.2

$ vi logstash-syslog.conf
input {
  file {
    path => [ "/scratch/rsyslog/*/messages.log" ]
    type => "syslog"
  }
  file {
    path => [ "/scratch/rsyslog/Aug/messages.log" ]
    type => "APIC"
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp } %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
    syslog_pri { }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
    }
  }
  if [type] == "APIC" {
    grok {
      match => { "message" => "%{CISCOTIMESTAMP:syslog_timestamp} %{CISCOTIMESTAMP} %{SYSLOGHOST:syslog_hostname} %{GREEDYDATA:syslog_message}" }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
   }
 }
}
output {
              elasticsearch {
                hosts => "noida-elk:9200"
                index => "syslog-%{+YYYY.MM.dd}"
                #index => "%{[type]}-%{+YYYY.MM.dd}"
                document_type => "messages"
  }
}

Solution

  • Fixed for me as its working for me.

     $ cat logstash-syslog.conf
        input {
          file {
            path => [ "/scratch/rsyslog/*/messages.log" ]
            type => "syslog"
          }
          file {
            path => [ "/scratch/rsyslog/Aug/messages.log" ]
            type => "apic_logs"
          }
        }
    
        filter {
          if [type] == "syslog" {
            grok {
              match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp } %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
              add_field => [ "received_at", "%{@timestamp}" ]
              remove_field => ["@version", "host", "message", "_type", "_index", "_score", "path"]
            }
            syslog_pri { }
            date {
              match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
         }
        }
          if [type] == "apic_logs" {
            grok {
              match => { "message" => "%{CISCOTIMESTAMP:syslog_timestamp} %{CISCOTIMESTAMP} %{SYSLOGHOST:syslog_hostname} (?<prog>[\w._/%-]+) %{SYSLOG5424SD:f1}%{SYSLOG5424SD:f2}%{SYSLOG5424SD:f3}%{SYSLOG5424SD:f4}%{SYSLOG5424SD:f5} %{GREEDYDATA:syslog_message}" }
              add_field => [ "received_at", "%{@timestamp}" ]
              remove_field => ["@version", "host", "message", "_type", "_index", "_score", "path"]
           }
         }
        }
        output {
                if [type] == "syslog" {
                elasticsearch {
                        hosts => "noida-elk:9200"
                        manage_template => false
                        index => "syslog-%{+YYYY.MM.dd}"
                        document_type => "messages"
          }
         }
        }
    
        output {
                if [type] == "apic_logs" {
                elasticsearch {
                        hosts => "noida-elk:9200"
                        manage_template => false
                        index => "apic_logs-%{+YYYY.MM.dd}"
                        document_type => "messages"
          }
         }
        }