I have Filesbeat
configured and it is able to read new log (syslog
for now) from the path provided in the filebeat.yml
file and forward it to Logstash
which should then parse the data forward that to Elasticearch
.
I do not see the parsed grok fields such as syslog_timestamp, syslog_hostname, syslog_pid anywhere in the Kibana event and i dont know what could be the reason as to why the data is not parsed.
Filebeat input file
Grok Filter (in Logstash)
input{
beats{
port => "5044"
}
}
filter {
if[type] == "syslog"{
grok{
match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"}
}
date {
match => ["syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss"]
}
}
}
output{
elasticsearch{
hosts => ["10.107.50.205:9200"]
index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
}
}
Kibana (Elasticsearch Json)
{
"_index": "filebeat-2019.09.30",
"_type": "_doc",
"_id": "kss7g20B5aLjyCF-6L2B",
"_version": 1,
"_score": null,
"_source": {
"message": "Sep 30 18:33:20 ut012905 metricbeat[46882]: 2019-09-30T18:33:20.254+0100#011INFO#011[monitoring]#011log/log.go:145#011Non-zero metrics in the last 30s#011{\"monitoring\": {\"metrics\": {\"beat\":{\"cpu\":{\"system\":{\"ticks\":770020,\"time\":{\"ms\":80}},\"total\":{\"ticks\":2091400,\"time\":{\"ms\":172},\"value\":2091400},\"user\":{\"ticks\":1321380,\"time\":{\"ms\":92}}},\"handles\":{\"limit\":{\"hard\":4096,\"soft\":1024},\"open\":5},\"info\":{\"ephemeral_id\":\"63755af9-7bad-4b09-8909-52e7018409fe\",\"uptime\":{\"ms\":369450706}},\"memstats\":{\"gc_next\":23786560,\"memory_alloc\":12161776,\"memory_total\":453661591544,\"rss\":2052096},\"runtime\":{\"goroutines\":36}},\"libbeat\":{\"config\":{\"module\":{\"running\":0}},\"pipeline\":{\"clients\":3,\"events\":{\"active\":89,\"published\":47,\"total\":47}}},\"metricbeat\":{\"system\":{\"cpu\":{\"events\":3,\"success\":3},\"filesystem\":{\"events\":3,\"success\":3},\"fsstat\":{\"events\":1,\"success\":1},\"load\":{\"events\":3,\"success\":3},\"memory\":{\"events\":3,\"success\":3},\"network\":{\"events\":6,\"success\":6},\"process\":{\"events\":22,\"success\":22},\"process_summary\":{\"events\":3,\"success\":3},\"socket_summary\":{\"events\":3,\"success\":3}}},\"system\":{\"load\":{\"1\":0.04,\"15\":0.01,\"5\":0.04,\"norm\":{\"1\":0.04,\"15\":0.01,\"5\":0.04}}}}}}",
"host": {
"containerized": false,
"name": "ut012905",
"architecture": "x86_64",
"hostname": "ut012905",
"id": "74e969e835cbfe982aa3ed2f5d76fdd9",
"os": {
"platform": "ubuntu",
"name": "Ubuntu",
"version": "16.04.6 LTS (Xenial Xerus)",
"codename": "xenial",
"family": "debian",
"kernel": "4.4.0-161-generic"
}
},
"ecs": {
"version": "1.0.1"
},
"@version": "1",
"agent": {
"id": "afafb888-8d08-4a4b-8f4d-6c64291fb43d",
"version": "7.3.2",
"hostname": "ut012905",
"type": "filebeat",
"ephemeral_id": "57c8f630-00d5-4c88-bf2d-bb1102cd8530"
},
"log": {
"offset": 3218320,
"file": {
"path": "/var/log/syslog"
}
},
"tags": [
"myCluster1",
"beats_input_codec_plain_applied"
],
"input": {
"type": "log"
},
"fields": {
"env": "staging"
},
"@timestamp": "2019-09-30T17:33:23.354Z"
},
"fields": {
"@timestamp": [
"2019-09-30T17:33:23.354Z"
]
},
"sort": [
1569864803354
]
}
The document_type
setting was removed from Filebeat on version 6.0, since you are using Filebeat 7.3 this setting is ignored and your message does not have the type
field.
You need to use fields
to add a new field and change your pipeline to filter based on that field.
You need something like this in your filebeat configuration.
fields:
type: syslog
Then you need to change your conditional in Logstash.
if [fields][type] == "syslog"