Search code examples
oracle-databaseamazon-web-serviceselasticsearchamazon-s3logstash-grok

Logstash Grok filter pattern for Oracle RDS XML Audit Logs


I would like create a logstash grok pattern to parse the below oracle audit log and extract only the values from "<AuditRecord> to </AuditRecord>"

{"messageType":"DATA_MESSAGE","owner":"656565656566","logGroup":"/aws/rds/instance/stg/audit","logStream":"STG_ora_20067_20210906120520144010741320.xml","subscriptionFilters":["All logs"],"logEvents":[{"id":"36370952585791240628335082776414249187626811417307774976","timestamp":1630929920144,"message":<AuditRecord><Audit_Type>8</Audit_Type><EntryId>1</EntryId><Extended_Timestamp>2021-08-31T13:25:20.140969Z</Extended_Timestamp><DB_User>/</DB_User><OS_User>rdsdb</OS_User><Userhost>ip-172-27-1-72</Userhost><OS_Process>6773</OS_Process><Instance_Number>0</Instance_Number><Returncode>0</Returncode><OSPrivilege>SYSDBA</OSPrivilege><DBID>918393906</DBID> <Sql_Text>CONNECT</Sql_Text> </AuditRecord>"}]}

these logs are stored in s3 and in gz format. I am using below config for Logstash but its not working.

input {
    s3 {
        bucket => "s3bucket"
        type => "oracle-audit-log-xml"
        region => "eu-west-1"

   }
}

filter {
 ## For Oracle audit log

 if [type] == "oracle-audit-log-xml" {

        mutate { gsub => [ "message", "[\n]", "" ] }

        grok {
                match => [ "message","<AuditRecord>%{DATA:temp_audit_message}</AuditRecord>" ]
        }
        mutate {
                add_field => { "audit_message" => "<AuditRecord>%{temp_audit_message}</AuditRecord>" }
        }
        xml {
                store_xml => true
                source => "audit_message"
                target => "audit"
        }
        mutate {
                add_field => { "timestamp" => "%{[audit][Extended_Timestamp]}" }
        }
        date {
      match => [ "timestamp","yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'","ISO8601" ]
      target => "@timestamp"
    }
        # remove temporary fields
        mutate { remove_field => ["message", "audit_message", "temp_audit_message"] }
        
 if "_grokparsefailure" in [tags] {
  drop{}
}
 }
 }

output {
  amazon_es {
      hosts => ["elasticsearch url"]
      index => "rdslogs-%{+YYYY.MM.dd}"
      region => "eu-west-1"
      aws_access_key_id => ''
      aws_secret_access_key => ''
}
}

it seems to be an issue with the line below

{"messageType":"DATA_MESSAGE","owner":"656565656566","logGroup":"/aws/rds/instance/stg/audit","logStream":"STG_ora_20067_20210906120520144010741320.xml","subscriptionFilters":["All logs"],"logEvents":[{"id":"36370952585791240628335082776414249187626811417307774976","timestamp":1630929920144,"message":

is there any way we can modify this to drop the above line.

Thanks


Solution

  • You don't need a grok pattern as your logs are in JSON format. Install logstash json filter plugin.

    $ logstash-plugin install logstash-filter-json
    

    And add filter setting to like below to parse your logs.

    filter{
           json {
                    source => "message" 
                }
    }
    

    Can check attached screenshot from my local ELK setup. Tried to parse log line provided by you.

    enter image description here enter image description here