Search code examples
apache-flinkflink-streaming

ProcessFunction in Flink to output a result every 5 seconds


My input stream is Tuple2<String, String>, I want to group by the first field and sum the integer in the second field. This is my ProcessFunction:

public static class MyKeyedProcessFunction
      extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
    private ValueState<Integer> state;

    @Override
    public void open(Configuration parameters) throws Exception {
      state = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Integer.class));
    }

    @Override
    public void processElement(
        Tuple2<String, Integer> value,
        Context ctx,
        Collector<Tuple2<String, Integer>> out) throws Exception {
      Integer sum = state.value();
      if (sum == null) {
        sum = 0;
      }
      sum += value.f1;
      state.update(sum);
      ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
    }

    @Override
    public void onTimer(
        long timestamp,
        OnTimerContext ctx,
        Collector<Tuple2<String, Integer>> out) throws Exception {
      out.collect(Tuple2.of(ctx.getCurrentKey(), state.value()));
      state.clear();
    }
  }

Now the onTimer is called for every element. I specified the input as:

aaa,50
aaa,40
aaa,10

I see the output like:

(aaa,100)
(aaa, null)
(aaa, null)

How can I get the output as (aaa,100)?


Solution

  • You registered a timer for every incoming event. Maybe you can also try to create a new ValueState of type boolean that indicates if an initial timer has been registered.

    As soon as the first event comes in, you register a timer in the processElement method like:

    
    if(timerRegistered.value()==false){
    ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
    timerRegistered.update(true);
    }
    

    Then you will just go on and register new timers for the 5 sec interval in the onTimer method instead of the processElement.

    I didn't test the code but it should give an idea.

    Kind Regards Dominik