Search code examples
linuxelasticsearchlogstashfilebeatelk

How to make a filebeat config to collect a specific lines from a file


have a file log aud of oracle and i need to collect the whole block instead of whole file. the block is from ACTION field to ACTION NUMBER

log file as below

Thu Nov  9 10:20:24 2023 +01:00
LENGTH : '373'
ACTION :[122] 'select 'export nls_nchar_characterset="'||value||'"' from nls_database_parameters where parameter='NLS_NCHAR_CHARACTERSET''
DATABASE USER:[1] '/'
PRIVILEGE :[6] 'SYSDBA'
CLIENT USER:[6] 'oracle'
CLIENT TERMINAL:[0] ''
STATUS:[1] '0'
DBID:[10] '3256053857'
SESSIONID:[10] '4294967295'
USERHOST:[12] 's01vl9926909'
CLIENT ADDRESS:[0] ''
ACTION NUMBER:[1] '3'

Thu Nov  9 10:20:24 2023 +01:00
LENGTH : '1575'
ACTION :[1323] 'select 'export db_patchset_new="'||max_ver||'"' from (
select length(regexp_replace(substr(description,0,decode(instr(description,'('),0,length(description),instr(description,'('))),'[^0.0-9]', '')) max_length,
       max(regexp_replace(substr(description,0,decode(instr(description,'('),0,length(description),instr(description,'('))),'[^0.0-9]', '')) max_ver
from   (select description from ( select da.description, da.action_time apply, nvl(dr.action_time, da.action_time-1) rollback
                                  from (select description, max(action_time) action_time from dba_registry_sqlpatch where action='APPLY'    group by description) da,
                                       (select description, max(action_time) action_time from dba_registry_sqlpatch where action='ROLLBACK' group by description) dr
                                  where da.description=dr.description(+)
                                )
        where apply > rollback
       )
where  (upper(description) like '%DATABASE%PATCH%' or upper(description) like '%DATABASE%RELEASE%') and upper(description) not like '%JAVA%' and upper(description) not like '%JVM%'
group by length(regexp_replace(substr(description,0,decode(instr(description,'('),0,length(description),instr(description,'('))),'[^0.0-9]', ''))
order by 1 desc
)
where rownum=1'
DATABASE USER:[1] '/'
PRIVILEGE :[6] 'SYSDBA'
CLIENT USER:[6] 'oracle'
CLIENT TERMINAL:[0] ''
STATUS:[1] '0'
DBID:[10] '3256053857'
SESSIONID:[10] '4294967295'
USERHOST:[12] 's01vl9926909'
CLIENT ADDRESS:[0] ''
ACTION NUMBER:[1] '3'

I used the include_lines pattern to collect these specific lines but my probleme here is that in the field "ACTION" sometimes there is actions that contains long queries and my conf collect only the first line instead of the whole query.

example here my event doesnt collect the whole query it stops at word FROM

{"@timestamp":"2023-11-09T13:34:21.491Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.17.0"},"message":"ACTION:'select 'export db_patchset_new="'||max_ver||'"' from ('","fields":{"env":"staging"},"host":{"name":"s01vl9926909"},"event":{"timezone":"+01:00"},"log":{"offset":811,"file":{"path":"/apps/oracle/diag/rdbms/q08775kp1/Q08775KP10/audit/Q08775KP10_ora_29807_20231109143420100545459814.aud"}},"input":{"type":"log"},"ecs":{"version":"1.12.0"}

at the fisrt time my file input_Oracle.yml was like below; it work fine but the problem was ACTION field

my inputs files config at frst time work fine

  enabled: true
  tags: ["linux-Oracle"]
  #ignore_older: 4h
  close_inactive: 30s

  paths:
   /apps/oracle/diag/rdbms/*/*/audit/*.aud

  include_lines: ['^LENGTH :', '^ACTION :', '^DATABASE USER:', '^PRIVILEGE :', '^CLIENT USER:', '^CLIENT TERMINAL:', '^STATUS:', '^DBID:', '^SESSIONID:', '^USERHOST:', '^CLIENT ADDRESS:', '^ACTION NUMBER:']

then i try "Multiline messages" but it not works for me

type: log
  enabled: true
  tags: ["linux-Oracle"]
  #ignore_older: 4h
  close_inactive: 30s

  paths:
    - /apps/oracle/diag/rdbms/*/*/audit/*.aud

  multiline:
    pattern: '^ACTION :'
    negate: true
    match: after

  include_lines: ['^LENGTH :', '^DATABASE USER:', '^PRIVILEGE :', '^CLIENT USER:', '^CLIENT TERMINAL:', '^STATUS:', '^DBID:', '^SESSIONID:', '^USERHOST:', '^CLIENT ADDRESS:', '^ACTION NUMBER:']`

Is there a way to do that by combine line !!


Solution

  • my solution for my issue was like below

     - type: log
       enabled: true
       tags: ["oracle_audit"]
       ignore_older: 4h
       close_inactive: 30s
       multiline.type: pattern
       multiline.pattern: '^........\d{1,2} \d\d:\d\d:\d\d \d{4}\s.\d\d:\d\d'
       multiline.negate: true
       multiline.match: after
       multiline.flush_pattern: '\r'