Search code examples
apache-flinkflink-streaming

How to start a timer when an event occurs in Apache Flink?


I want to start a timer when an event is received in Flink, send the timer value and stop the timer when another event is received. Let me explain. An event consists of an event name, a source id and other fields. So I have something like this:

E1("A",1,...) -> E2("B",1,...) -> E3("C",1,...).

When I receive event "A" I want to start a timer (keyed by the source id) and update a sink with the timer value periodically. When I receive event "C" I want to stop the timer and update the sink with the final timer value. Is there a way to accomplish that in Apache Flink?


Solution

  • You'd do a .keyBy(r -> r.getSourceId()), and follow that with a custom KeyedProcessWindow. This gives you access to the Flink TimerService, where you can create timers. When a timer fires, the onTimer() method in your custom function will be called, which is where you can use the last value you saved in state to update the "sink" (actually a remote service of some sort). You can start a new timer in the onTimer() method, so that it's a periodic update.