Search code examples
apache-flinkflink-streamingflink-statefun

Flink event correlation and lookback


I am new to flink and looking for advice on building a realtime event correlation system. I have two main usecases:

  1. Event correlation logic comprises of static rules based on the event type coming in input stream. In last X mins, correlate events of different event types and output data of events which are of business value based on those rules. For example, in last 1 min, check if price of event type A in marketplace A1 < 20,000 and price of event type B in marketplace M2 is less than < 30,000, then add data of event A to output stream, else add data of event B.
  2. For the events of interest/business value, calculate the diff of price from last X mins. For example, if post applying all rules, we decide event A is of interest for last 1 min window, before adding data of event in output stream, we also want to calculate diff in price of event A from last 10mins.

To achieve these usecases, I was evaluating applying key by on input stream by product type Id in input data. This will give me data of multiple event types for that product for different marketplaces and then using a sliding event time window of lookback period say last 10mins with a sliding window of 1min and applying ProcessWindowFunction to write the correlation logic for data for last 1 mins and using other 9 mins of data for lookback and calculating price difference for interested events.

I am not fully sure if this is the best way of implementing these. Any tips/recommendations would be much appreciated!


Solution

  • Overall I'd say your options are:

    • Use sliding windows, as you've proposed.
    • Use a KeyedProcessFunction. This lower-level API offers more control and might lead to a better optimized solution. Sometimes this is also simpler, so if you find the window API is getting in your way, consider this.
    • Use Flink SQL and/or the Table API. You might find it easier to express and maintain the rules if they are written in SQL. Perhaps MATCH_RECOGNIZE is relevant.