I have a Stream consisting of below sample value,
correlation_id and event_type
Example
aud-103 trigger
aud-104 trigger
aud-109 mitigation
aud-103 mitigation
If an event with same correlation_id detected AND latest event_type is mitigation AND already has trigger previously, set status as mitigated else unmitigated;
In other word, An event is considered mitigated only if it has trigger and mitigation as event type\
I need to build a table to aggregate by latest value of a column
I managed to achieve this using below (very) dirty query \
CREATE TABLE SIEM_PARSE_EVENT_TBL WITH (KAFKA_TOPIC='SIEM_PARSE_EVENT_TBL', PARTITIONS=1, REPLICAS=1) AS SELECT
SIEM_PARSE_EVENT.CORRELATION_ID CORRELATION_ID,
CASE
WHEN
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger')
OR
(LATEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'trigger' AND EARLIEST_BY_OFFSET(SIEM_PARSE_EVENT.EVENT_TYPE) = 'mitigation')
THEN 'mitigated'
ELSE 'unmitigated'
END AS MITIGATED_STATUS,
COUNT(*) TOTAL
FROM SIEM_PARSE_EVENT SIEM_PARSE_EVENT
GROUP BY SIEM_PARSE_EVENT.CORRELATION_ID
EMIT CHANGES;
Is there any way to do this cleaner? \
UPDATE
I managed using below query,
SELECT CORRELATIONID , collect_list( EVENTMITIGATIONTYPE ) from SIEM_PARSE_EVENT group by correlationid
HAVING ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'trigger' )
AND ARRAY_CONTAINS( collect_list(EVENTMITIGATIONTYPE), 'mitigation' )
EMIT CHANGES;
I kinda like your solution; it is readable.
As some alternatives, you could probably use a series of functions like collect_list
, array_intersect
, and array_length
to gather the event_type
s, and check if both mitigation
and trigger
are present...
If you are managing your own ksqlDB, writing custom UDFs or a UDAF to help would be another option.