Search code examples
logstashlogstash-grokfilebeat

LogStash concat Filebeat input


I am trying to merge filebeat messages in LogStash. I have the next Log file:

----------- SCAN SUMMARY -----------
Known viruses: 8520944
Engine version: 0.102.4
Scanned directories: 408
Scanned files: 1688
Infected files: 0
Total errors: 50
Data scanned: 8.93 MB
Data read: 4.42 MB (ratio 2.02:1)
Time: 22.052 sec (0 m 22 s)

I read it from Filebeat and send to LogStash.

The problem is that I received in LogStash each line in different message. I want to merge it all, and also add a new field that is "received_at", from the fileBeat input.

I would like to have the next output from LogStash:

{
  Known_viruses: 8520944
  Engine_version: 0.102.4
  Scanned_directories: 408
  Scanned_files: 1688
  Infected_files: 0
  Total_errors: 50
  Data_scanned: 8.93MB
  Data_read: 4.42MB
  Time: 22.052sec (0 m 22 s)
  Received_at: <timeStamp taked from filebeat received JSON message>
}

The input I received in LogStash from Filebeat is the next per each line:

{
    "@timestamp": "2021-04-20T08:03:33.843Z",
    "@version": "1",
    "tags": ["beats_input_codec_plain_applied"],
    "host": {
        "name": "PRLN302",
        "architecture": "x86_64",
        "ip": ["10.126.40.18", "fe80::7dbf:4941:cd39:c0f9", "172.17.0.1", "fe80::42:82ff:fe2b:895", "fe80::24b2:8cff:feeb:20b4", "172.18.0.1", "fe80::42:53ff:fe31:a025", "fe80::b420:e1ff:fe97:c152", "fe80::9862:21ff:fe3a:c33e", "fe80::48a6:70ff:fec2:60d6", "192.168.126.117", "fe80::2831:c644:33d5:321"],
        "id": "a74e193a551f4d379f9488b80a463581",
        "os": {
            "platform": "ubuntu",
            "version": "20.04.2 LTS (Focal Fossa)",
            "family": "debian",
            "name": "Ubuntu",
            "type": "linux",
            "codename": "focal",
            "kernel": "5.8.0-49-generic"
        },
        "mac": ["e8:6a:64:32:fe:4d", "dc:8b:28:4a:c8:88", "02:42:82:2b:08:95", "26:b2:8c:eb:20:b4", "02:42:53:31:a0:25", "b6:20:e1:97:c1:52", "9a:62:21:3a:c3:3e", "4a:a6:70:c2:60:d6", "00:50:b6:b9:19:d7"],
        "containerized": false,
        "hostname": "PRLN302"
    },
    "message": "----------- SCAN SUMMARY -----------",
    "agent": {
        "ephemeral_id": "ad402f64-ab73-480c-b6de-4af6184f012c",
        "type": "filebeat",
        "version": "7.12.0",
        "id": "f681d775-d452-490a-9b8b-036466a87d35",
        "name": "PRLN302",
        "hostname": "PRLN302"
    },
    "input": {
        "type": "log"
    },
    "ecs": {
        "version": "1.8.0"
    },
    "log": {
        "offset": 0,
        "file": {
            "path": "/var/log/clamav-test.log"
        }
    }
}

Could it be possible?


Solution

  • It is possible, you'll need to look for the multiline messages in the filebeat input: https://www.elastic.co/guide/en/beats/filebeat/current/multiline-examples.html somthing like below would do it i think:

    multiline.type: pattern
    multiline.pattern: '^----------- SCAN SUMMARY -----------'
    multiline.negate: true
    multiline.match: after