Search code examples
logstashkibanaelastic-stackcdnelk

How to change “message” value in index


In logstash pipeline or indexpattern how to change the following part of CDN log in "message" field to seperate or extract some data then aggrigate them.

<40> 2022-01-17T08:31:22Z logserver-5 testcdn[1]: {"method":"GET","scheme":"https","domain":"www.123.com","uri":"/product/10809350","ip":"66.249.65.174","ua":"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Mobile Safari/537.36 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)","country":"US","asn":15169,"content_type":"text/html; charset=utf-8","status":200,"server_port":443,"bytes_sent":1892,"bytes_received":1371,"upstream_time":0.804,"cache":"MISS","request_id":"b017d78db4652036250148216b0a290c"}

expected change:

{"method":"GET","scheme":"https","domain":"www.123.com","uri":"/product/10809350","ip":"66.249.65.174","ua":"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Mobile Safari/537.36 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)","country":"US","asn":15169,"content_type":"text/html; charset=utf-8","status":200,"server_port":443,"bytes_sent":1892,"bytes_received":1371,"upstream_time":0.804,"cache":"MISS","request_id":"b017d78db4652036250148216b0a290c"}

Bacause this part "<40> 2022-01-17T08:31:22Z logserver-5 testcdn[1]:" is not parsed in jason and I can't create visual dashboard based on some fileds such as country, asn, etc...

The original log that indexed by logstash is:

{
  "_index": "logstash-2022.01.17-000001",
  "_type": "_doc",
  "_id": "Qx8pZ34BhloLEkDviGxe",
  "_version": 1,
  "_score": 1,
  "_source": {
    "message": "<40> 2022-01-17T08:31:22Z logserver-5 testcdn[1]: {\"method\":\"GET\",\"scheme\":\"https\",\"domain\":\"www.123.com\",\"uri\":\"/product/10809350\",\"ip\":\"66.249.65.174\",\"ua\":\"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Mobile Safari/537.36 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\",\"country\":\"US\",\"asn\":15169,\"content_type\":\"text/html; charset=utf-8\",\"status\":200,\"server_port\":443,\"bytes_sent\":1892,\"bytes_received\":1371,\"upstream_time\":0.804,\"cache\":\"MISS\",\"request_id\":\"b017d78db4652036250148216b0a290c\"}",
    "port": 39278,
    "@timestamp": "2022-01-17T08:31:22.100Z",
    "@version": "1",
    "host": "93.115.150.121"
  },
  "fields": {
    "@timestamp": [
      "2022-01-17T08:31:22.100Z"
    ],
    "port": [
      39278
    ],
    "@version": [
      "1"
    ],
    "host": [
      "93.115.150.121"
    ],
    "message": [
      "<40> 2022-01-17T08:31:22Z logserver-5 testcdn[1]: {\"method\":\"GET\",\"scheme\":\"https\",\"domain\":\"www.123.com\",\"uri\":\"/product/10809350\",\"ip\":\"66.249.65.174\",\"ua\":\"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 5X Build/MMB29P) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Mobile Safari/537.36 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\",\"country\":\"US\",\"asn\":15169,\"content_type\":\"text/html; charset=utf-8\",\"status\":200,\"server_port\":443,\"bytes_sent\":1892,\"bytes_received\":1371,\"upstream_time\":0.804,\"cache\":\"MISS\",\"request_id\":\"b017d78db4652036250148216b0a290c\"}"
    ],
    "host.keyword": [
      "93.115.150.121"
    ]
  }
}

Thanks


Solution

  • Add these configurations to filter section of you logstash config:

    #To parse the message field
    grok {
        match => { "message" => "<%{NONNEGINT:syslog_pri}>\s+%{TIMESTAMP_ISO8601:syslog_timestamp}\s+%{DATA:sys_host}\s+%{NOTSPACE:sys_module}\s+%{GREEDYDATA:syslog_message}"}
    }
    #To replace message field with syslog_message
    mutate {
        replace => [ "message", "%{syslog_message}" ]
    }
    

    Once the message field is replaced by syslog_message, You can add the json filter below to parse the json to separate fields as well..

    json {
        source => "syslog_message"
    }