My input stream is Tuple2<String, String>
, I want to group by the first field and sum the integer in the second field. This is my ProcessFunction
:
public static class MyKeyedProcessFunction
extends KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>> {
private ValueState<Integer> state;
@Override
public void open(Configuration parameters) throws Exception {
state = getRuntimeContext().getState(new ValueStateDescriptor<>("sum", Integer.class));
}
@Override
public void processElement(
Tuple2<String, Integer> value,
Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
Integer sum = state.value();
if (sum == null) {
sum = 0;
}
sum += value.f1;
state.update(sum);
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(Tuple2.of(ctx.getCurrentKey(), state.value()));
state.clear();
}
}
Now the onTimer
is called for every element. I specified the input as:
aaa,50
aaa,40
aaa,10
I see the output like:
(aaa,100)
(aaa, null)
(aaa, null)
How can I get the output as (aaa,100)
?
You registered a timer for every incoming event. Maybe you can also try to create a new ValueState of type boolean that indicates if an initial timer has been registered.
As soon as the first event comes in, you register a timer in the processElement method like:
if(timerRegistered.value()==false){
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
timerRegistered.update(true);
}
Then you will just go on and register new timers for the 5 sec interval in the onTimer method instead of the processElement.
I didn't test the code but it should give an idea.
Kind Regards Dominik