Search code examples
amazon-web-servicesamazon-redshiftanalyticsamazon-emramazon-athena

How to efficiently aggregate data in billions of individual records in AWS?


At a high / theoretical level I know exactly the type of architecture I want to build and how it would work, but I'm attempting to construct this as cheaply as possible using AWS services and my lack of familiarity with the offerings of AWS has me running in circles.

The Data

We run a video streaming platform. On busy nights we have about 100 simultaneous live streams going with upwards of 30,000 viewers. We expect this number to rise to 100,000 in the next few years. A live stream lasts, on average, 2 hours.

We send a heartbeat from our player every 10 seconds with information about the viewer -- how much data they've viewed, how much data they've buffered, what quality they're streaming, etc.

These heartbeats are sent directly to an AWS Kinesis endpoint.

Finally, we want to retain all past messages for at least 5 years (hopefully longer) so that we can look at historic analytics.

Some back of the envelope calculations suggest we will have 0.1 * 60 * 60 * 2 * 100000 * 365 * 5 = 131 billion heartbeat messages five years from now.

Our Old Pipeline

Our old system had a single Kinesis consumer. Aggregate data was stored in DynamoDB. Whenever a message arrived we would read the record from DynamoDB, update the record, then write the new record back. This read-update-write loop limited the speed at which we could process messages and made it so that each message coming in was dependent on the messages before it, so they could not be processed in parallel.

Part of the reason for this setup is that our message schema was not well designed from the outset. We send the timestamp at which the message was sent, but we do not send "amount of video watched since last heartbeat". As a result in order to compute the total viewer time we need to look up the last heartbeat message sent by this player, subtract the timestamps, and add that value. Similar issues exist with many other metrics.

Our New Pipeline

We've begun to run into scaling issues. During our peak hours analytics can be delayed by as much as four hours while waiting for a backlog of messages to be processed. If this backlog reaches 24 hours Kinesis will start deleting data. So we need to fix our pipeline to remove this dependency on past messages so we can process them in parallel.

The first part of this was updating the messages sent by our players. Our new specification includes only metrics that can be trivially sum'd with no subtraction. So we can just keep adding to the "time viewed" metric, for instance, without any regard to past messages.

The second part of this was ensuring that Kinesis never backs up. We dump the raw messages to S3 as quickly as they arrive with no processing (Kinesis Data Fire Hose) so that we can crunch analytics on them at our leisure.

Finally, we now want to actually extract information from these analytics as quickly as possible. This is where I've hit a snag.

The Questions We Want to Answer

As this is an analytics pipeline, our questions mostly revolve around filtering these messages and then aggregating fields for the remaining messages (possibly, in fact likely, with grouping). For instance:

How many Android users watched last night's stream in HD? (FILTER by stream and OS)

What's the average bandwidth usage among all users? (SUM and COUNT, with later division of the final aggregates which could be done on the dashboard side)

What percent of users last year were on any Apple device (iOS, tvOS, etc)? (COUNT, grouped by OS)

What's the average time spent buffering among Android users for streams in the past year? (a mix of all of the above)

Options

  • AWS Athena would allow us to query the data in S3 directly as if it were an ANSI SQL table. However reading up on Athena, unless the data is properly formatted it can be incredibly slow. Some benchmarks I've seen show that processing 1.1 billion rows of CSV data can take up to 2 minutes. I'm looking at processing 100x that much data
  • AWS EMR and AWS Redshift sound like they are built for this purpose, but are complicated to set up and have a high base cost to run (requiring an EC2 cluster to remain active at all times). AWS Redshift also requires data be loaded into it, which sounds like it might be a very slow process, delaying our access to analytics
  • AWS Glue sounds like it may be able to take the raw messages as they arrive in S3 and convert them to Parquet files for more rapid querying via Athena
  • We could run a job to regularly batch messages to reduce the total number that must be processed. While a stream is live we'll receive one message every 10 seconds, but we really only care about the totals for a given viewer. This means that when a 2-hour stream concludes we can combine the 720 messages we've received from that player into a single "summary" message about the viewer's experience during the whole stream. This would massively reduce the amount of data we need to process, but exactly how and when to trigger this process isn't clear to me

The Ideal Architecture

This is a Big Data problem. The generic solution to Big Data problems is "don't take your data to your query, take your query to your data". If these messages were spread across 100 small storage nodes then each node could filter, sum, and count the subset of data they hold and pass these aggregates back to a central node which sums the sums and sums the counts. If each node is only operating on 1/100th of the data set then this kind of processing could theoretically be incredibly fast.

My Confusion

While I have a theoretical understanding of the "ideal" architecture, it's not clear to me if AWS works this way or how to construct a system that will function well like this.

  • S3 is a black box. It's not clear if Athena queries are run on individual nodes and aggregates are further reduced elsewhere, or if there's a system reading all of the data and aggregating it in a central location
  • Redshift requires the data by copied into a Redshift database. This doesn't sound fast, nor distributed
  • It's unclear to me how EMR works or if it will suit my purpose. Still researching
  • AWS Glue seems like it may need to be triggered by some event?
  • Parquet files seems to be like CSVs, where multiple records reside in a single file. Meanwhile I'm dumping one record per file. But perhaps there's a way to fix that? e.g. batching files every minute or every 5 minutes?
  • RDS or a similar service might be really good for this (indexing and whatnot) but would require a guaranteed schema (or necessitate migrating if our message schema changed) which is a concern. Migrating terabytes of data if we change our message schema sounds out of the question

Finally, along with wanting to get analytics results in as "real time" as possible (ideally we want to know within 1 minute when someone joins or leaves a stream), we want the dashboards to load quickly. Waiting 30 seconds to see the count of live viewers is horrendous. Dashboards should load in 2 seconds or less (ideally)

The plan is to use QuickSight to create dashboards (our old system had a hack-y Django app that read from our DynamoDB aggregates table, but I'd like to avoid creating more code for people to maintain)


Solution

  • I would look into Druid. Not an AWS offering, but easily runs on AWS, with good integration with S3 and Kinesis.

    1. Capable of reading from Kinesis, at high speeds, and make the data available for querying right away. Can also flatten and transform the data as it reads it.
    2. Capable of doing rollups/aggregation/compaction during ingestion (and further reduce data in an async manner). From what you wrote, it seems to me that it could easily reduce the number of rows in the DB by a very large factor.
    3. Capable of fast queries, using standard SQL.
    4. Smart partitioning of the data to scan only the relevant dates.

    The down-side is that you will need to keep a cluster up and running for ingestion and for querying. It is pretty scalable, so you can start small. On the up-side - you're not using 10 different technologies (Athena/Glue/EMR/etc.)

    You might want to consider contacting Imply, which can ease the deployment.