Search code examples
elasticsearchfluent

fluent-plugin-elasticsearch: "Could not push log to Elasticsearch" error with "error"=>{"type"=>"mapper_parsing_exception"}


When I am injecting data collected by Fluentd to Elasticsearch using fluent-plugin-elasticsearch, some data caused the following error:

2017-04-09 23:47:37 +0900 [error]: Could not push log to Elasticsearch: {"took"=>3, "errors"=>true, "items"=>[{"index"=>{"_index"=>"logstash-201704", "_type"=>"ruby", "_id"=>"AVtTLz_cUzkwT9CQCxrH", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [message]", "caused_by"=>{"type"=>"illegal_state_exception", "reason"=>"Can't get text on a START_OBJECT at 1:27"}}}}, .....]}

It seems that elasticsearch banned the data for error failed to parse [message] and Can't get text on a START_OBJECT at 1:27. but I cannot see what data is sent to Elasticsearch and what's wrong.

Any ideas?


Solution

  • fluent-plugin-elasticsearch uses _bulk API to sending data. I put the request-dumping code on /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/elasticsearch-api-5.0.4/lib/elasticsearch/api/actions/bulk.rb as following:

      def bulk(arguments={})
        ...
          payload = body
        end
        $log.info([method, path, params, payload].inspect)  # <=== here ($log is global logger of fluentd)
        perform_request(method, path, params, payload).body
    

    And I found the request sent to Elasticsearch was as following:

    POST /_bulk
    {"index":{"_index":"logstash-201704","_type":"ruby"}}
    {"level":"INFO","message":{"status":200,"time":{"total":46.26,"db":33.88,"view":12.38},"method":"PUT","path":"filtered","params":{"time":3815.904,"chapter_index":0},"response":[{}]},"node":"main","time":"2017-04-09T14:39:06UTC","tag":"filtered.console","@timestamp":"2017-04-09T23:39:06+09:00"}
    

    The problem is message field contains JSON object, although this field is mapped as analyzed string on Elasticsearch.