Let's say my Flink job receives a stream of Stock Prices (as an example) and issues alert if lets say a Stock drops below a certain price. Users can add or remove these alert criteria. For example user abc@somemail.com
creates a rule to be alerted if price of GME
drops below $100. How can my Flink job dynamically keep track of all these alert criteria in a scalable manner?
I could create an API which my Flink job could call to get all of the updated alert criteria but that would mean calling the API numerous times to keep every thing up to date.
Or I could create a permanent table with Flink Table API, which another Flink job updates as soon as users creates a new alert criteria.
What would be the best practice for this use case?
Notes:
Here's a design sketch for a purely streaming approach:
alertUpdates = alerts
.keyBy(user)
.process(managePreviousAlerts) // uses MapState<Stock, Price>
.keyBy(stock, price)
priceUpdates = prices
.keyBy(stock)
.process(managePriceHistory)
.keyBy(stock, price)
alertUpdates
.connect(priceUpdates)
.process(manageAlertsAndPrices) // uses MapState<User, Boolean>
managePreviousAlerts
maintains a per-user MapState
from stocks to alert prices. When a new alert arrives, find the existing alert for this stock (for this user), if any. Then emit two AlertUpdates
: a RemoveAlert
event for this (user, stock, oldAlertPrice) and an AddAlert
event for this (user, stock, newAlertPrice).managePriceHistory
keeps some per-stock pricing history in state, and uses some business logic to decide if the incoming price is a change that merits triggering alerts. (E.g., maybe you only alert if the price went down.)manageAlertsAndPrices
maintains a per-stock, per-price MapState
, keyed by user.
MapState
are all of the users w/ alerts for this stock at this price. Upon receiving a PriceUpdate
, alert all of these users by iterating over the keys of the MapState
.RemoveAlert
, remove the user from the MapState
.AddAlert
, add the user to MapState
.This should scale well. The latency will be governed by the two network shuffles caused by the keyBys.