Search code examples
jsonsplunksyslog

How do I configure sysflow to send JSON event to Splunk


I'm new to SysFlow, and I want to send the events to Splunk.

The problem is that the events arrive at Splunk with a metadata prefix so Splunk doesn't know how to interpret the event as JSON automatically.

Here is the raw event:

Mar  9 21:57:26 10.0.0.158 1 2023-03-09T23:57:26+02:00 RHEL-Server /usr/bin/sfprocessor 60594 sysflow - {"version":5,"endts":0,"opflags":["EXEC"],"ret":0,"ts":1678399046064475870,"type":"PE","meta":{"schema":4,"tracename":"."},"node":{"id":"RHEL-Server","ip":"10.0.0.158"},"pproc":{"args":"--switched-root --system --deserialize 30","cmdline":"/usr/lib/systemd/systemd --switched-root --system --deserialize 30","createts":1678395155396715155,"entry":true,"exe":"/usr/lib/systemd/systemd","gid":0,"group":"root","name":"systemd","oid":"d2e72642fc1d9df2","pid":1,"tty":false,"uid":0,"user":"root"},"proc":{"acmdline":["/usr/lib/systemd/systemd --switched-root --system --deserialize 30","/usr/lib/systemd/systemd --switched-root --system --deserialize 30"],"aexe":["/usr/lib/systemd/systemd","/usr/lib/systemd/systemd"],"aname":["systemd","systemd"],"apid":[73232,1],"args":"/usr/bin/dnf makecache --timer","cmdline":"/usr/bin/dnf /usr/bin/dnf makecache --timer","createts":1678399046062638027,"entry":false,"exe":"/usr/bin/dnf","gid":0,"group":"root","name":"dnf","oid":"6a55582e3bbddc8a","pid":73232,"tid":73232,"tty":false,"uid":0,"user":"root"},"policies":[{"id":"Unauthorized installer detected","desc":"Use of package installer detected in container","priority":1},{"id":"Unauthorized installer detected","desc":"Use of package installer detected in container","priority":1}],"tags":["actionable-offense","suspicious-process","mitre:T1072"]}

Here is my configuration (/etc/sysflow/pipelines/pipeline.local.json):

{
  "pipeline":[
    {
     "processor": "sysflowreader",
     "handler": "flattener",
     "in": "sysflow sysflowchan",
     "out": "flat flattenerchan"
    },
    {
     "processor": "policyengine",
     "in": "flat flattenerchan",
     "out": "evt eventchan",
     "policies": "/etc/sysflow/policies/runtimeintegrity",
     "mode": "alert"
    },
    {
     "processor": "exporter",
     "in": "evt eventchan",
     "export": "syslog",
     "format": "json",
     "syslog.proto": "udp",
     "syslog.tag": "sysflow",
     "syslog.host": "splunk-server",
     "syslog.port": "514"
    }
  ]
}

I read SysFlow documentation about configuration (here)

I know how to parse the event with SPL, I just need to build an add-on that will that for me (I need the data in data models), here is the SPL:

index="sysflow" sourcetype="sysflow:syslog"
| rex field=_raw "^(?:[^ \n]* ){7}(?P<json>.+)"
| spath input=json

Solution

  • Not the most elegant solution... but it works

    I created a Field Extraction in props.conf:

    EXTRACT-json = ^(?:[^ \n]* ){7}(?P<json>.+)
    

    And then parsed personally each field using the spath() function in eval, for example:

    EVAL-file_path = spath(json, "file.path")
    EVAL-parent_process = spath(json, "pproc.cmdline")
    EVAL-process = spath(json, "proc.cmdline")
    EVAL-user = spath(json, "proc.user")
    EVAL-type = spath(json, "type")
    EVAL-file_name = spath(json, "file.name")
    EVAL-process_name = spath(json, "proc.name")
    EVAL-parent_process_name = spath(json, "pproc.name")
    EVAL-parent_process_path = spath(json, "pproc.exe")
    EVAL-dest = spath(json, "node.id")
    EVAL-signature = spath(json, "policies{}.id")
    EVAL-file_acl = spath(json, "file.openflags{}")
    EVAL-action = spath(json, "opflags{}")
    

    I'm working now on an Add-On for splunkbase (CIM compliant)