Search code examples
amazon-dynamodbaggregateapache-flinkflink-streamingamazon-dynamodb-streams

Aggregating Movie Rental information in a DynamoDB table using Flink


Happy Holidays everyone!

tl;dr: I need to aggregate movie rental information that is being stored in one DynamoDB table and store running total of the aggregation in another table. How do I ensure exactly-once aggregation?

I currently store movie rental information in a DynamoDB table named MovieRentals: {movie_title, rental_period_in_days, order_date, rent_amount}

We have millions of movie rentals happening on any given day. Our web application needs to display the aggregated rental amount for any given movie title.

I am planning to use Flink to aggregate rental amounts by movie_title on the MovieRental DynamoDB stream and store the aggregated rental amounts in another DynamoDB table named RentalAmountsByMovie: {movie_title, total_rental_amount}

How do I ensure that RentalAmountsByMovie amounts are always accurate. i.e. How do I prevent results from any checkpoint from not updating the RentalAmountsByMovie table records more than once?

  1. Approach 1: I store the checkpoint ids in the RentalAmountsByMovie table and do conditional updates to handle the scenario described above?
  2. Approach 2: I can possibly implement the TwoPhaseCommitSinkFunction that uses DynamoDB Transactions. However, according to Flink documentation the commit function can be called more than once and hence needs to be idempotent. So even this solution requires checkpoint-ids to be stored in the target data store.
  3. Approach 3: Another pattern seems to be just storing the time-window aggregation results in the RentalAmountsByMovie table: {movie_title, rental_amount_for_checkpoint, checkpoint_id}. This way the writes from Flink to DynamoDB will be idempotent (Flink is not doing any updates it is only doing inserts to the target DDB table. However, the webapp will have to compute the running total on the fly by aggregating results from the RentalAmountsByMovie table. I don't like this solution for its latency implications to the webapp.

  4. Approach 4: May be I can use Flink's Queryable state feature. However, that feature seems to be in Beta: https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/queryable_state.html

I imagine this is a very common aggregation use case. How do folks usually handle updating aggregated results in Flink external sinks?

I appreciate any pointers. Happy to provide more details if needed.

Thanks!


Solution

  • Typically the issue you are concerned about is a non-issue, because folks are using idempotent writes to capture aggregated results in external sinks.

    You can rely on Flink to always have accurate information for RentalAmountsByMovie in Flink's internal state. After that it's just a matter of mirroring that information out to DynamoDB.

    In general, if your sinks are idempotent, that makes things pretty straightforward. The state held in Flink will consist of some sort of pointer into the input (e.g., offsets or timestamps) combined with the aggregates that result from having consumed the input up to that point. You will need to bootstrap the state; this can be done by processing all of the historic data, or by using the state processor API to create a savepoint that establishes a starting point.