Search code examples
algorithmamazon-web-servicesaggregate-functionsamazon-kinesis

AWS Kinesis Stream Aggregating Based on Time Spans


I currently have a Kinesis stream that is populated with JSON messages that are in the form of:

{"datetime": "2017-09-29T20:12:01.755z", "payload":"4"}
{"datetime": "2017-09-29T20:12:07.755z", "payload":"5"}
{"datetime": "2017-09-29T20:12:09.755z", "payload":"12"}
etc...

What im trying to accomplish here is to aggregate the data in terms of time chunks. In this case, i'd like to group the averages for 10 minute spans. For example, from 12:00 > 12:10, I want to average the payload value and save it as the 12:10 value.

For example, the above data would produce:

Datetime: 2017-09-29T20:12:10.00z
Average: 7

The method that i'm thinking of is to use caching at the service level and then some type of way to track the time. If the messages ever move into the next 10 minute timespan, I average the cached data, store it to the DB and then delete that cache value.

Currently, my service sees 20,000 messages every minute with higher volume to be expected in the future. I'm a little stuck on how to implement this to guarantee I get all the values for that 10 minute time period from Kinesis. Those of you that are more familiar with Kinesis and AWS, is there a simple way to go about this?

The reason for doing this is to shorten the query times for data from large timespans, such as for 1 year. I wouldn't want to grab millions of values but rather, a few aggregated values.

Edit:

I have to keep track of many different averages at the same time. For example, the above JSON may just pertain to one 'set', such as the average temperature per city in 10 minute timespans. This requires me to keep track of each cities averages for every timespan.

Toronto (12:01 - 12:10): average_temp
New York (12:01 - 12:10): average_temp
Toronto (12:11 - 12:20): average_temp
New York (12:11 - 12:20): average_temp
etc...

This could pertain to any city worldwide. If new temperatures arrive for say, Toronto and it pertains to the 12:01 - 12:10 timespan, I have to recalculate and store that average.


Solution

  • This is how I would do it. Thanks for the interesting question.

    Kinesis Streams --> Lambda (Event Insertor) --> DynamoDB(Streams) --> Lambda(Count and Value incrementor) --> DynamoDB(streams) --> Average (Updater)

    DynamoDB Table Structure:

    { 
    Timestamp: 1506794597
    Count: 3
    TotalValue: 21
    Average: 7
    Event{timestamp}-{guid}: { event }
    }
    
    timestamp -- timestamp of the actual event
    guid -- avoid any collision on a timestamp that occurred at same time
    Event{timestamp}-{guid} -- This should be removed by (count and value incrementor)
    

    If the fourth record for that timestamp arrives,

    Get the time close to 10 min timespan, increment the count, increment the totalvalue. Neve read the value and increment, that will result in error unless you use strong consistency(which is very costly to read). Instead perform the increment operation with atomic increment.

    http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithItems.html#WorkingWithItems.AtomicCounters

    Create DynamoDB streams from the above table, Listen on another lambda, Now calculate the average value and update the value.

    When you calculate the average, don't perform a read from the table. Instead the data will be available over the stream, you just need to calculate the average and update it. (overwrite previous average value).

    This will work on any scale and with high availability.

    Hope it helps.

    EDIT1:

    Since the OP is not familier with AWS Services,

    Lambda Documentation:

    https://aws.amazon.com/lambda/

    DynamoDB Documentation:

    https://aws.amazon.com/dynamodb/

    AWS cloud services used for the solution.