Search code examples
elasticsearchlogstashkibanafilebeat

Nginx Logs Pipleine using ELK Stack + Kafka


Hi I am trying to send nginx logs which is in json format via filebeat into kafka then into logstash and then ES and then visualize it using Kibana.

MY nginx log format , which is in json is like below:

 '{"@timestamp":"$time_iso8601","host":"$hostname",'
                            '"server_ip":"$server_addr","client_ip":"$remote_addr",'
                            '"xff":"$http_x_forwarded_for","domain":"$host",'
                            '"url":"$uri","referer":"$http_referer",'
                            '"args":"$args","upstreamtime":"$upstream_response_time",'
                            '"responsetime":"$request_time","request_method":"$request_method",'
                            '"status":"$status","size":"$body_bytes_sent",'
                            '"request_body":"$request_body","request_length":"$request_length",'
                            '"protocol":"$server_protocol","upstreamhost":"$upstream_addr",'
                            '"file_dir":"$request_filename","http_user_agent":"$http_user_agent"'
                            '}'

my LOgstash conf file

input {
  kafka {
    bootstrap_servers => "10.10.10.240:9092"
    topics => ["payments-nginx"]
    codec => json
  }
}
filter {
geoip {
      source => "client_ip"
      remove_field => [ "[geoip][location][lon]", "[geoip][location][lat]" ]
    }
useragent {
    source => "http_user_agent"
    target => "ua"
    remove_field => [ "[ua][minor]","[ua][major]","[ua][build]","[ua][os_name]" ]
  }
mutate {
        remove_field => [ "xff", "args", "request_body","http_user_agent" ]
      }
}
output {
  elasticsearch {
    hosts => ["localhost:9202"]
    manage_template => false
    index => "payments-nginx-%{+YYYY.MM.dd}"
  }
#}
#stdout { codec => rubydebug }
}

Problem: i am not able to see fields like status,response time,client_ip in kibana, it is showing in message but fields are not visible seperately, as logstash is not able to parse client_ip field , my geographic location is also failing, logtash logs are not showing error but still is is not able to parse, any solution or reason for this

enter image description here

Filebeat verison 7.10.1 logstash version 7.10.1 es version 7.10.0

I have enabled debug logs for logstash

on filter {:event=>#<LogStash::Event:0x23a9bc8d>}
[2021-01-19T22:13:09,546][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x2143ba96>}
[2021-01-19T22:13:09,546][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x2143ba96>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x6558b0f4>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x6558b0f4>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Running json filter {:event=>#<LogStash::Event:0x651bef38>}
[2021-01-19T22:13:09,547][DEBUG][logstash.filters.json    ][main][4b26c54f59b453b33ce3542b7d139d33aaf31bfcf8185cddaf9983d6fdac068e] Event after json filter {:event=>#<LogStash::Event:0x651bef38>}
[2021-01-19T22:13:09,873][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Node 0 sent an incremental fetch response for session 636759397 with 0 response partition(s), 2 implied partition(s)
[2021-01-19T22:13:09,873][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition payments-nginx-1 at position FetchPosition{offset=26718738, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=mbkkafka240:9092 (id: 0 rack: null), epoch=-1}} to node mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Added READ_UNCOMMITTED fetch request for partition payments-nginx-0 at position FetchPosition{offset=26715192, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=mbkkafka240:9092 (id: 0 rack: null), epoch=-1}} to node mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.FetchSessionHandler][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Built incremental fetch (sessionId=636759397, epoch=5580) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 2 partition(s)
[2021-01-19T22:13:09,874][DEBUG][org.apache.kafka.clients.consumer.internals.Fetcher][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(payments-nginx-1, payments-nginx-0)) to broker mbkkafka240:9092 (id: 0 rack: null)
[2021-01-19T22:13:09,875][DEBUG][org.apache.kafka.clients.NetworkClient][main][24653f85c4bf579b2041409263ccfa549df5a7dcabc6d18b8484a1e85b9bfa97] [Consumer clientId=logstash-0, groupId=logstash] Using older server API v10 to send FETCH {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=0,session_id=636759397,session_epoch=5580,topics=[],forgotten_topics_data=[]} with correlation id 5963 to node 0
Error parsing json {:source=>"message", :raw=>{"event"=>{"module"=>"nginx", "dataset"=>"nginx.access", "timezone"=>"+05:30"}, "log"=>{"offset"=>17947386523, "file"=>{"path"=>"/var/log/nginx/new_json.log"}}, "host"=>{"name"=>"mbkapp57", "os"=>{"version"=>"7 (Core)", "name"=>"CentOS Linux", "platform"=>"centos", "family"=>"redhat", "kernel"=>"3.10.0-327.13.1.el7.x86_64", "codename"=>"Core"}, "architecture"=>"x86_64", "containerized"=>false, "ip"=>["10.10.10.82"], "id"=>"92278feb4ab04110b9b72833eb1bf548", "mac"=>["22:77:85:f2:da:96", "d2:ea:9f:33:51:5c", "aa:d2:1d:af:62:49", "ba:c5:e6:af:17:ce"], "hostname"=>"mbkapp57"}, "message"=>"{\"@timestamp\":\"2021-01-20T00:05:31+05:30\",\"host\":\"mbkapp57\",\"server_ip\":\"10.10.10.82\",\"client_ip\":\"45.119.57.164\",\"xff\":\"-\",\"domain\":\"appapi.mobikwik.com\",\"url\":\"/p/upgradewallet/v3/kycConsent\",\"referer\":\"-\",\"args\":\"-\",\"upstreamtime\":\"0.026\",\"responsetime\":\"0.026\",\"request_method\":\"GET\",\"status\":\"200\",\"size\":\"129\",\"request_body\":\"-\",\"request_length\":\"425\",\"protocol\":\"HTTP/1.1\",\"upstreamhost\":\"10.10.10.159:8080\",\"file_dir\":\"/usr/share/nginx/html/p/upgradewallet/v3/kycConsent\",\"http_user_agent\":\"Mobikwik/22 CFNetwork/1209 Darwin/20.2.0\"}", "@timestamp"=>"2021-01-19T18:35:31.410Z", "agent"=>{"version"=>"7.10.1", "ephemeral_id"=>"2d565807-b19c-40c2-a092-e66ee5dd4f9b", "name"=>"mbkapp57", "type"=>"filebeat", "hostname"=>"mbkapp57", "id"=>"c9d8d92d-31be-4996-99d8-aae6680c1570"}, "fileset"=>{"name"=>"access"}, "ecs"=>{"version"=>"1.5.0"}, "@metadata"=>{"beat"=>"filebeat", "version"=>"7.10.1", "pipeline"=>"filebeat-7.10.1-nginx-access-pipeline", "type"=>"_doc"}, "input"=>{"type"=>"log"}, "service"=>{"type"=>"nginx"}}, :exception=>java.lang.ClassCastException}
[root@es02 logstash]# 


Solution

  • I changed my approach a bit by changing Nginx logs pattern to below format:

    log_format custom  '$remote_addr [$time_local] '
                               '$request $status $body_bytes_sent '
                               '$http_referer $http_user_agent $host $upstream_addr $request_time';
    

    Further, logstash configuration to parse nginx logs from Kafka and convert it into json format by applying grok pattern Logstash conf file:

      kafka {
        bootstrap_servers => "10.X.X.X:9092"
        topics => ["payments"]
      }
    }
    filter {
       grok {
          match => { "message" => "%{IPORHOST:remote_ip} \[%{HTTPDATE:time}\] %{WORD:http_method} %{DATA:url} HTTP/%{NUMBER:http_version} %{NUMBER:response_code} %{NUMBER:body_sent_bytes} %{DATA:referrer} %{DATA:agent} %{IPORHOST:domain} %{NOTSPACE:upstream_address} %{NUMBER:upstream_time}" }
            }
       date {
            match => [ "time", "dd/MMM/YYYY:H:m:s Z" ]
            target => "my_date_as_date"
          }
       geoip {
          source => "remote_ip"
          add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
          add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
        }
       mutate {
          add_field => { "host_nginx" => "X.X.X.82" }
          add_field => { "read_time" => "%{@timestamp}" }
          convert => ["body_sent_bytes", "integer"]
          convert => ["upstream_time", "float"]
          remove_field => [ "message", "[geoip][location][lat]", "[geoip][location][lon]", "[geoip][country_code3]" ]
    }
    }
    output {
       elasticsearch {
        hosts => ["localhost:9200"]
        manage_template => false
        index => "paymentapi-nginx-%{+YYYY.MM.dd}"
      }
    }