Search code examples
apache-flinkflink-streaming

onTimer method, why timer state is null?


I have a simple application like this (inside keyed process function).

As you can in the code section below, I am always first getting timerObject from state and if it does not exists, I am creating a new one, and update the state. Thus, there will never be a empty/null state.

And basically this state is just for keeping the object last time, for example:

  • If an object was seen at time 10:15 then register time will be 10:30.
  • However if an object was seen again at time 10:25, then register time will be updated to 10:40
  • If process function runs onTimer at time 10:40, that's means there was no object in 15 mins interval, then i am just clearing my state.

Problem is that logger sometimes prints null for the state object. This should not be the case right?

public class ProcessRule extends KeyedProcessFunction<Tuple, LogEntity, Result> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ProcessRule.class);

    private transient ValueState<TimerObject> timerState;

    @Override
    public void open(Configuration parameters) throws Exception{
        ValueStateDescriptor<TimerObject> timerValueStateDescriptor = new ValueStateDescriptor<TimerObject>(
                "timerStateForProcessRule",
                TypeInformation.of(TimerObject.class)
        );
        timerState = getRuntimeContext().getState(timerValueStateDescriptor);
    }

    @Override
    public void processElement(LogEntity value, Context ctx, Collector<Result> out) throws Exception{
        registerTimer(value, ctx);
        if (conditionTrue) {
           convert Result add to collector
        }
    }

    private void registerTimer(LogEntity element, Context ctx) throws Exception{
        TimerObject stateTimer = timerState.value();

        if (stateTimer == null){
            stateTimer = new TimerObject();
            long timeInterval = 15 * 60 * 1000;
            stateTimer.setTimeInterval(timeInterval);
        }

        stateTimer.setCurrentTimeInMilliseconds(element.getTimestampMs());
        timerState.update(stateTimer);
    
        ctx.timerService().registerProcessingTimeTimer(stateTimer.getNextTimer());
        // getNextTimer => currentTime + timeInterval
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ValidationResult> out) throws Exception{
        TimerObject stateTimer = timerState.value();
        LOGGER.info("Timer fired at the the timestamps: {} for: {}", timestamp, stateTimer);
        timerState.clear();
    }
}


Solution

  • The issue here is most probably coming from the fact that You are registering multiple different timers, but You don't seem to delete them when registering new ones. So, this basically means that when first-timer fires the timerState is cleared, but seconds again next timer may also fire since it might have been registered to fire 3 sec after the first one and in this case the timerState may already be null.