Search code examples
apache-flinkflink-streaming

Best practice for Apache Flink for user defined alerts


Let's say my Flink job receives a stream of Stock Prices (as an example) and issues alert if lets say a Stock drops below a certain price. Users can add or remove these alert criteria. For example user [email protected] creates a rule to be alerted if price of GME drops below $100. How can my Flink job dynamically keep track of all these alert criteria in a scalable manner?

I could create an API which my Flink job could call to get all of the updated alert criteria but that would mean calling the API numerous times to keep every thing up to date.

Or I could create a permanent table with Flink Table API, which another Flink job updates as soon as users creates a new alert criteria.

What would be the best practice for this use case?

Notes:

  1. Alert should be issued with minimal latency
  2. Alert criteria should be updated as soon as user creates it.

Solution

  • Here's a design sketch for a purely streaming approach:

    alertUpdates = alerts
      .keyBy(user)
      .process(managePreviousAlerts)  // uses MapState<Stock, Price>
      .keyBy(stock, price)
    
    priceUpdates = prices
      .keyBy(stock)
      .process(managePriceHistory)
      .keyBy(stock, price)
    
    alertUpdates
      .connect(priceUpdates)
      .process(manageAlertsAndPrices) // uses MapState<User, Boolean>
    
    • managePreviousAlerts maintains a per-user MapState from stocks to alert prices. When a new alert arrives, find the existing alert for this stock (for this user), if any. Then emit two AlertUpdates: a RemoveAlert event for this (user, stock, oldAlertPrice) and an AddAlert event for this (user, stock, newAlertPrice).
    • managePriceHistory keeps some per-stock pricing history in state, and uses some business logic to decide if the incoming price is a change that merits triggering alerts. (E.g., maybe you only alert if the price went down.)
    • manageAlertsAndPrices maintains a per-stock, per-price MapState, keyed by user.
      • The keys of this MapState are all of the users w/ alerts for this stock at this price. Upon receiving a PriceUpdate, alert all of these users by iterating over the keys of the MapState.
      • Upon receiving a RemoveAlert, remove the user from the MapState.
      • Upon receiving an AddAlert, add the user to MapState.

    This should scale well. The latency will be governed by the two network shuffles caused by the keyBys.