Search code examples
timerapache-flinkcomplex-event-processingdata-stream

Apache Flink - Send event if no data was received for x minutes


How can I implement an operator with Flink's DataStream API that sends an event when no data was received from a stream for a certain amount of time?


Solution

  • Such an operator can be implemented using a ProcessFunction.

    DataStream<Long> input = env.fromElements(1L, 2L, 3L, 4L);
    
    input
      // use keyBy to have keyed state. 
      // NullByteKeySelector will move all data to one task. You can also use other keys
      .keyBy(new NullByteKeySelector())
      // use process function with 60 seconds timeout
      .process(new TimeOutFunction(60 * 1000));
    

    The TimeOutFunction is defined as follows. In this example it uses processing time.

    public static class TimeOutFunction extends ProcessFunction<Long, Boolean> {
    
      // delay after which an alert flag is thrown
      private final long timeOut;
      // state to remember the last timer set
      private transient ValueState<Long> lastTimer;
    
      public TimeOutFunction(long timeOut) {
        this.timeOut = timeOut;
      }
    
      @Override
      public void open(Configuration conf) {
        // setup timer state
        ValueStateDescriptor<Long> lastTimerDesc = 
          new ValueStateDescriptor<Long>("lastTimer", Long.class);
        lastTimer = getRuntimeContext().getState(lastTimerDesc);
      }
    
      @Override
      public void processElement(Long value, Context ctx, Collector<Boolean> out) throws Exception {
        // get current time and compute timeout time
        long currentTime = ctx.timerService().currentProcessingTime();
        long timeoutTime = currentTime + timeOut;
        // register timer for timeout time
        ctx.timerService().registerProcessingTimeTimer(timeoutTime);
        // remember timeout time
        lastTimer.update(timeoutTime);
      }
    
      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<Boolean> out) throws Exception {
        // check if this was the last timer we registered
        if (timestamp == lastTimer.value()) {
          // it was, so no data was received afterwards.
          // fire an alert.
          out.collect(true);
        }
      }
    }