Search code examples
apache-kafkaksqldb

KSQL Query for check value completeness


I have a Stream consisting of below sample value,

correlation_id and event_type

Example
aud-103 trigger
aud-104 trigger
aud-109 mitigation
aud-103 mitigation

If an event with same correlation_id detected AND latest event_type is mitigation AND already has trigger previously, set status as mitigated else unmitigated;
In other word, An event is considered mitigated only if it has trigger and mitigation as event type\

I need to build a table to aggregate by latest value of a column

I managed to achieve this using below (very) dirty query \

CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
  SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
  CASE
   WHEN 
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger') 
   OR
   (LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
   THEN 'mitigated'
   ELSE 'unmitigated'
  END AS MITIGATED_STATUS,
  COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;

Is there any way to do this cleaner? \

UPDATE
I managed using below query,

SELECT  CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid 
 HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
  AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;

Solution

  • I kinda like your solution; it is readable.

    As some alternatives, you could probably use a series of functions like collect_list, array_intersect, and array_length to gather the event_types, and check if both mitigation and trigger are present...

    If you are managing your own ksqlDB, writing custom UDFs or a UDAF to help would be another option.