Search code examples
amazon-dynamodbapache-flinkflink-streamingreconcile

Flink Aggregation Reconciliation


Transactions table in DynamoDB

Transactions {transaction_id, customer_id, statment_id, transaction_date, transaction_amount}

Statements table in DynamoDB

Statements {statement_id, customer_id, start_time, end_time, statement_amount}

Millions of transactions are happening on any given day. I'm thinking about using Flink to aggregate transaction amounts into statement amounts using DynamoDB streams.

At any given point I need to know whether all transaction amounts belonging to a statement are aggregated or not. That is, display if the statement amount is stale or not. Essentially, I am talking about reconciliation. How do I achieve that in Flink?


Solution

  • It's easy enough to use something like a KeyedProcessFunction to continuously update some Flink state that is aggregating the statement_amount for each statement_id as new transactions are ingested. But the question, as I understand it, is how to know when that aggregation is complete, or in other words, when Flink has processed all of the transactions for a given statement_id.

    Stream processing applications always face this problem. Unlike batch processing, where one can simply process all the data and then produce a result, with stream processing we are processing one record at a time, with no knowledge of what may come in the future, or with how much delay.

    This leads us to a tradeoff between latency and completeness. In general, one can always wait a bit longer to see what additional data arrives, thereby increasing one's chances of having produced a result based on (more) complete information. Watermarks are the technical manifestation of this tradeoff. Any streaming application that uses event time must produce watermarks, each of which marks a point in the stream with a timestamp, and declares that the stream is, at that point, probably complete up to that timestamp.

    For some applications, quickly producing a result that is probably correct is fine, and in fact, may be better than waiting longer to produce a result that is somewhat more likely to be correct. But in other applications it's necessary to be completely accurate (whatever that may mean, precisely).

    Exactly what you should do is not a technical question, but rather a business process question. Ultimately this depends on precisely what a reconciled statement means to your business. Perhaps you should aim to reproduce the semantics of whatever process is currently in place.

    Having said that, Flink provides a set of tools you can combine to address this use case in a variety of ways, depending on the details of how you want this to work. Here's how the pieces can fit together:

    Each statement has an end_time. When the watermark for the transaction stream reaches that end_time, that's the first instant at which one might consider that the aggregation of the transactions for that statement is complete.

    This watermarking would (typically) be done on the basis of specifying a bound on the amount by which the transactions stream can be out-of-order. But you have to expect that no matter how pessimistic you are, a few anomalous transactions will violate this assumption, and be late relative to the watermarks.

    To accommodate this you can either increase the watermarking delay to try to cover all conceivable lateness (which one might argue is, in general, impossible), or decide that at some point you simply must go ahead and produce a statement that claims to be reconciled, but that might actually require an update or amendment in the future. Whether or not this issue of arbitrary lateness is a real problem (as it can be in banking, where some international transactions may experience very long delays), or merely theoretical, depends on your actual use case.

    Being able to accommodate late transactions will require that you either (1) retain the statement data in Flink's managed state in order to add in the late transaction(s), which can then be used to update the statement, or (2) handle late events in a special way, by reading the previously produced result from the DB and then updating that record in the DB (which would need to be done transactionally). Approach #2 could be implemented in a separate job that consumes a stream of late transactions produced by the first job.

    You might be able to define your way out of this problem by including a timestamp on the statement that specifies that the statement includes precisely those transactions that had been processed up to that point in time.