Search code examples
analyticsamazon-kinesiscomplex-event-processingevent-stream-processing

Use Kinesis Analytics for analyzing events and related missing events, separated in time?


I have a stream of events for various devices that can either be "connected" or "disconnected".

I.e. an event has the following structure:

  • timestamp
  • device_id
  • event ("connected" or "disconnected")

I want to trigger an action instantly when a device has been disconnected and not connected within (a device specific configurable) time period, e.g. 1 hour. I only want to trigger once per "disconnected" event.

Is this something that can be done using AWS Kinesis Analytics and if so what would the query look like? If not, can it be done using some other tool or do I have to custom build it?


Solution

  • This is possible with Drools Kinesis Analytics (managed service on Amazon):

    Types:

    package com.test;
    
    import java.util.Set;
    
    import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBAttribute;
    import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBHashKey;
    import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBTable;
    
    declare DeviceConfig
        @DynamoDBTable(tableName="DeviceConfig")
    
        deviceId: int @DynamoDBHashKey(attributeName="device_id");
        timeoutMillis: int @DynamoDBAttribute(attributeName="timeout_millis");
    end
    
    declare DeviceEvent
    @role( event )
        // attributes 
        deviceId: int;
        timestamp: java.util.Date;
        event: String;
    end
    
    declare DisconnectAlert
        deviceId: int;
    end
    

    Rules:

    package com.test;
    
    // setup dynamic timer
    rule "disconnect timer"
        timer( expr: $timeout )
    when
        $event : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
        DeviceConfig(deviceId == $event.deviceId, $timeout : timeoutMillis) from entry-point configs
    then
        insertLogical(new DisconnectAlert($event.getDeviceId()));
    end
    
    rule "remove dups"
    when
        $event : DeviceEvent( $id : deviceId, $state : event ) from entry-point events
        $dup : DeviceEvent(this != $event, deviceId == $event.deviceId, event == $state, this after $event) from entry-point events
    then
        delete($dup);
    end
    
    // on connect event remove "disconnected" state
    rule "connect device"
    when
        $disconnected : DeviceEvent( $id : deviceId, event == "disconnected" ) from entry-point events
        DeviceEvent(deviceId == $disconnected.deviceId, event == "connected", this after $disconnected) from entry-point events
    then
        delete($disconnected);
    end
    
    // cleanup "connected" state to free up memory (not needed any more)
    rule "delete connected state"
    when
        $connected : DeviceEvent(event == "connected") from entry-point events
    then
        delete($connected);
    end
    

    Note, that there are 2 types of inputs:

    • DeviceConfig, which is mostly static device configuration, located in DynamoDB.
    • DeviceEvent, which is a Kinesis Stream of device events.