Search code examples
amazon-kinesisamazon-kinesis-kpl

Amazon Kinesis - Identify Step Timeout


Trying to create a Kinesis Analytics query to alert when a step in a process has taken too long (or died and not moved on).

I have a steam of data that contains status updates as a multi-step process moves from step to step. I am trying to write a query that can identify when the next step hasn't happened within a specific amount of time (aka timed out). Specifically, I would like to know when a single ProcessID doesn't move from "Started" to "Running" within 5 minutes.

I know how to do this in a Database, but it gets confusing when the timescale is constantly moving. Any help you can provide is much appreciated!

My events have three attributes:
ProcessID - Integer
Status - String ("Started", "Running", or "Complete")
HappenedOn - Datetime (e.g. 2017-10-02 15:17:00)

How I would do this in Database (non Kinesis)

In SQL I would use join the event table to itself using a LEFT OUTER JOIN, but can't figure out how to do this in a real time query situation.

#This will show me the start events that don't have a corresponding 'running' event

SELECT * FROM events as F 
LEFT OUTER JOIN events as S on F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE  F.STATUS = 'start' AND S.STATUS IS NULL;

Solution so far in Kinesis
This query saves and runs, but doesn't give me what I am looking for.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (E1PROCESSID integer, 
E1STATUS varchar(7), E1HAPPENED varchar(32), E2PROCESSID integer, 
E2STATUS varchar(7), E2HAPPENED varchar(32) );

CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"

SELECT F.PROCESSID, F.STATUS, F.HAPPENED, S.PROCESSID, S.STATUS, S.HAPPENED
FROM "SOURCE_SQL_STREAM_001" OVER (RANGE INTERVAL '5' MINUTE PRECEDING) AS F 
LEFT OUTER JOIN "SOURCE_SQL_STREAM_001"  AS S
ON F.PROCESSID = S.PROCESSID AND S.STATUS = 'running'
WHERE F.STATUS = 'start' AND S.STATUS IS NULL;

Even if I could get the above query to work, I need Kinesis to only look for corresponding events (or the lack of them) 5 minutes after the HAPPENED value (e.g. need to do a DATEDIFF between the current datetime and HAPPENED). Any advice on how to add this would be appreciated.

Also, I feel like I need to use FOLLOWING not PRECEDING, but the SQL parser won't let me (and I can see why). I am also confused on which stream join to add the OVER window to...LEFT? RIGHT? BOTH?

Many thanks in advance.


Solution

  • You can do this using Drools by creating the following rules:

    declare EventA
      @role( event )
    end
    
    declare EventB
      @role( event ) 
    end
    
    rule "Timeout EventA"
    when
      $a : EventA()
      not(exists(EventB(this after[0,5m] $a)))
    then
      insertLogical(new TimeoutA($a.id));
    end
    

    You can author Drools Kinesis Analytics with this service