Search code examples
splunksplunk-query

How to filter out events before joining datasets with stats


I have some events (2 different sourcetype—process_events and socket_events) that look something like this:

{
  "action": "added",
  "columns": {
    "time": "1527895541",
    "success": "1",
    "action": "connect",
    "auid": "1000",
    "family": "2",
    "local_address": "",
    "local_port": "0",
    "path": "/usr/bin/curl",
    "pid": "30220",
    "remote_address": "127.0.0.2",
    "remote_port": "80"
  },
  "unixTime": 1527895545,
  "hostIdentifier": "HOST_ONE",
  "name": "socket_events",
  "numerics": false
}
{
  "action": "added",
  "columns": {
    "time": "1527895541",
    "success": "1",
    "action": "connect",
    "auid": "1000",
    "family": "2",
    "local_address": "",
    "local_port": "0",
    "path": "/usr/bin/curl",
    "pid": "30220",
    "remote_address": "10.10.10.10",
    "remote_port": "12345"
  },
  "unixTime": 1527895545,
  "hostIdentifier": "HOST_ONE",
  "name": "socket_events",
  "numerics": false
}
{
  "action": "added",
  "columns": {
    "uid": "0",
    "time": "1527895541",
    "pid": "30220",
    "path": "/usr/bin/curl",
    "auid": "1000",
    "cmdline": "curl google.com",
    "ctime": "1503452096",
    "cwd": "",
    "egid": "0",
    "euid": "0",
    "gid": "0",
    "parent": ""
  },
  "unixTime": 1527895550,
  "hostIdentifier": "HOST_ONE",
  "name": "process_events",
  "numerics": false
}

Current query:

(name=socket_events OR name=process_Events) columns.path=*bin*
| stats values(*) as * by hostIdentifier, columns.path, columns.pid

Result

+-------------------------------------------------------------------------------------------+
| hostIdentifier | columns.path  | columns.pid | cmdline         | columns.remote_addr | columns.remote_p
+-------------------------------------------------------------------------------------------+
| HOST_ONE       | /usr/bin/curl | 30220       | curl google.com | 127.0.0.2           | 80
|                |               |             |                 | 10.10.10.10         | 12345
+-------------------------------------------------------------------------------------------+

Is there a way for me to apply some filter logics like these

If columns.remote is multivalue AND one of the remote_address!=127.0.0.0/8 AND > remote_port>5000, then pipe it to stats

If columns.remote is not multivalue AND remote_address!=127.0.0.0/8 AND remote_port>5000, then pipe it to stats()

Else, ignore

I feel like I need to apply the filter before the | stats ... because I need to exclude all the socket_events events that don't satisfy the condition before the JOIN with process_events.

Any help would be awesome!

Also, sample data taken from https://osquery.readthedocs.io/en/stable/deployment/process-auditing/


Solution

  • One can't filter out multi-value fields before stats because it's stats that makes them multi-value. Try filtering out the undesired IP addresses before joining the events.

    (name=socket_events OR name=process_Events) columns.path=*bin*
    | where (isnull(columns.remote_addr) OR NOT cidrmatch("127.0.0.0/8", columns.remote_addr))
    | stats values(*) as * by hostIdentifier, columns.path, columns.pid
    

    The isnull function retains rows that don't have a remote_addr field.