Trying to create a Kinesis Analytics query to alert when a step in a process has taken too long (or died and not moved on).
I have a steam of data that contains status updates as a multi-step process moves from step to step. I am trying to write a query that can identify when the next step hasn't happened within a specific amount of time (aka timed out). Specifically, I would like to know when a single ProcessID doesn't move from "Started" to "Running" within 5 minutes.
I know how to do this in a Database, but it gets confusing when the timescale is constantly moving. Any help you can provide is much appreciated!
My events have three attributes:
ProcessID - Integer
Status - String ("Started", "Running", or "Complete")
HappenedOn - Datetime (e.g. 2017-10-02 15:17:00)
How I would do this in Database (non Kinesis)
In SQL I would use join the event table to itself using a LEFT OUTER JOIN, but can't figure out how to do this in a real time query situation.
#This will show me the start events that don't have a corresponding 'running' event
SELECT * FROM events as F
LEFT OUTER JOIN events as S on F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE F.STATUS = 'start' AND S.STATUS IS NULL;
Solution so far in Kinesis
This query saves and runs, but doesn't give me what I am looking for.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (E1PROCESSID integer,
E1STATUS varchar(7), E1HAPPENED varchar(32), E2PROCESSID integer,
E2STATUS varchar(7), E2HAPPENED varchar(32) );
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT F.PROCESSID, F.STATUS, F.HAPPENED, S.PROCESSID, S.STATUS, S.HAPPENED
FROM "SOURCE_SQL_STREAM_001" OVER (RANGE INTERVAL '5' MINUTE PRECEDING) AS F
LEFT OUTER JOIN "SOURCE_SQL_STREAM_001" AS S
ON F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE F.STATUS = 'start' AND S.STATUS IS NULL;
Even if I could get the above query to work, I need Kinesis to only look for corresponding events (or the lack of them) 5 minutes after the HAPPENED value (e.g. need to do a DATEDIFF between the current datetime and HAPPENED). Any advice on how to add this would be appreciated.
Also, I feel like I need to use FOLLOWING not PRECEDING, but the SQL parser won't let me (and I can see why). I am also confused on which stream join to add the OVER window to...LEFT? RIGHT? BOTH?
Many thanks in advance.
You can do this using Drools by creating the following rules:
declare EventA
@role( event )
end
declare EventB
@role( event )
end
rule "Timeout EventA"
when
$a : EventA()
not(exists(EventB(this after[0,5m] $a)))
then
insertLogical(new TimeoutA($a.id));
end
You can author Drools Kinesis Analytics with this service