Search code examples
elasticsearchlogstashfilebeataws-elasticsearch

How to decode JSON in ElasticSearch load pipeline


I set up ElasticSearch on AWS and I am trying to load application log into it. The twist is that application log entry is in JSON format, like

{"EventType":"MVC:GET:example:6741/Common/GetIdleTimeOut","StartDate":"2021-03-01T20:46:06.1207053Z","EndDate":"2021-03-01","Duration":5,"Action":{"TraceId":"80001266-0000-ac00-b63f-84710c7967bb","HttpMethod":"GET","FormVariables":null,"UserName":"ZZZTHMXXN"} ...}

So, I am trying to unwrap it. Filebeat docs suggest that there is decode_json_fields processor; however, I am getting message fields in Kinbana as a single JSON string; nothing unwrapped.

I am new to ElasticSearch, but I am not going to use it as an excuse not to do analysis first. Only as an explanation that I am not sure which information is helpful for answering the question.

Here is filebeat.yml:

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/opt/logs/**/*.json
processors:
  - add_host_metadata:
    when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~
  - decode_json_fields:
    fields: ["message"]

output.logstash:
    hosts: ["localhost:5044"]

And here is Logstash configuration file:

input {
    beats {
        port => "5044"
    }
}
output {
    elasticsearch {
        hosts => ["https://search-blah-blah.us-west-2.es.amazonaws.com:443"]
        ssl => true
        user => "user"
        password => "password"
        index => "my-logs"
        ilm_enabled => false
    }
}

I am still trying to understand the filtering and grok parts of Logstash, but it seems that it should work the way it is. Also, I am not sure where the actual tag messages comes from (probably, from Logstash or Filebeat), but it seems irrelevant as well.

UPDATE: AWS documentation doesn't give an example of just loading through filebeat, without logstash. If I don't use logstash (just FileBeat) and have the following section in filebeat.yml:

output.elasticsearch:
    hosts: ["https://search-bla-bla.us-west-2.es.amazonaws.com:443"]

    protocol: "https"
    #index: "mylogs"

    # Authentication credentials - either API key or username/password.
    #api_key: "id:api_key"
    username: "username"
    password: "password"

I am getting the following errors: If I use index: "mylogs" - setup.template.name and setup.template.pattern have to be set if index name is modified

And if I don't use index (where would it go in ES then?) -

Failed to connect to backoff(elasticsearch(https://search-bla-bla.us-west-2.es.amazonaws.com:443)): Connection marked as failed because the onConnect callback failed: cannot retrieve the elasticsearch license from the /_license endpoint, Filebeat requires the default distribution of Elasticsearch. Please make the endpoint accessible to Filebeat so it can verify the license.: unauthorized access, could not connect to the xpack endpoint, verify your credentials


Solution

  • If transmitting via logstash works in general, add a filter block as Val proposed in the comments and use this json plugin/filter: elastic.co/guide/en/logstash/current/plugins-filters-json.html - it automatically parses the json into elasticsearch fields