Search code examples
javaapache-flinkflink-streaming

Apache Flink weighted average based on two keys


I'm steaming data from a websocket via a Flink job and need to output a rolling weighted average based on the following logic:

Each message has attibutes "parent", "name", "amount", "value" Get latest message by "name" and combine with other latest messages for each "parent" to get weighted average based on "amount" and "value"

  1. parent = "a"; name = "m"; amount=100; value=12.45
  2. parent = "a"; name = "n"; amount=40; value=14.55
  3. parent = "a"; name = "m"; amount=100; value=17.45
  4. parent = "a"; name = "o"; amount=24; value=13.25
  5. parent = "a"; name = "n"; amount=40; value=12.55

Msgs 3, 4 and 5 are the respective latest messages by parent:name so these messages whould be used to get the current weighted average for "a". At any point in time, it is not known how many children a parent has. The logic for a weighted average is fine. It is more how to key, get latest, aggregate, average, keep state, etc. in Flink.

I've looked at RichFlatMapFunction, AggregateFunction but proving difficult to piece them together.

Any help or ideas appreciated.


Solution

  • Using low-level building blocks, you could build a solution with a KeyedProcessFunction. You would key the event stream by parent, and then use MapState<String, Event> to keep track of the latest event for each name. As events are processed, you can emit updated results. See the Flink docs for an example of a KeyedProcessFunction that uses MapState.

    If you want to use event time processing, you'll have to decide how to handle out-of-order events. Maybe you can ignore events that are out-of-order, or maybe you need to sort the stream first by timestamp.

    Working at higher level, you could use Flink SQL instead. You could use an OVER window partitioned by the combination of parent and name to keep track of the latest event for each parent/name combination, and then group by parent and compute the weighted average (perhaps with a user-defined aggregate function). See the Immerok Cookbook for an example of how to use OVER windows to get a stream of the latest events for a given key.

    Disclaimer: I work for Immerok (and I wrote that section of the Flink docs).