Search code examples
apache-flinkflink-streaming

Process Function Event Time Behaviour


I am referring to the Process Function example mentioned in https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/operators/process_function/

/**
 * The data type stored in the state
 */
public class CountWithTimestamp {

    public String key;
    public long count;
    public long lastModified;
}

/**
 * The implementation of the ProcessFunction that maintains the count and timeouts
 */
public class CountWithTimeoutFunction 
        extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {

    /** The state that is maintained by this process function */
    private ValueState<CountWithTimestamp> state;

    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    @Override
    public void processElement(
            Tuple2<String, String> value, 
            Context ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // retrieve the current count
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // update the state's count
        current.count++;

        // set the state's timestamp to the record's assigned event time timestamp
        current.lastModified = ctx.timestamp();

        // write the state back
        state.update(current);

        // schedule the next timer 60 seconds from the current event time
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
    }

    @Override
    public void onTimer(
            long timestamp, 
            OnTimerContext ctx, 
            Collector<Tuple2<String, Long>> out) throws Exception {

        // get the state for the key that scheduled the timer
        CountWithTimestamp result = state.value();

        // check if this is an outdated timer or the latest timer
        if (timestamp == result.lastModified + 60000) {
            // emit the state on timeout
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

In this scenario my datastream is being produced by KafkaSource with no idleness behaviour configured

DataStream<Tuple2<Integer, Integer>> inputStream = env.fromSource(inputSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(1)), "Input Kafka Source")

Now consider a scenario where there is only 1 key that is being emitted by source, let's say key1 At time T1 when the first event comes, processElement is called and the CountWithTimestamp object is set accordingly ie count = 1 and lastModified = T1

Now there are no more events for lets say 70 secs(T2). Then another event comes in for the same key key1 Now here are my questions:

  1. When the second event comes, during my debugging, processElement always gets called first then onTimer. This is because watermark gets generated only after the event has been processed. Is my understanding correct?
  2. Since processElement is getting called first the lastModified is getting modified to T2 (earlier it was T1). This means that even if now timer triggers it won't process as lastModified got updated. And it won't process if the above mentioned scenario keeps repeating.

Thanks.


Solution

  • I believe you've got that right.

    1. Yes, watermarks follow the events that justify their creation.
    2. Yes, that example is flawed. It makes (unstated) assumptions about there being events for other keys.