I'm trying to implement a tumbling window based calculation on event time characteristics with flink.
Therefore I have a KeyedBroadcastProcessFunction
doing all the work. With ProcessTimers everything works as expected in a unit test. Now I changed the code to work with Event Timers but the times do not trigger. (I do register the timers with registerEventTimeTimer)
Basically the test is like this
@Test
public void evaluateFormular_ShouldSumOnTimer() throws Exception {
long minutesToWait = 1;
var definition = createTestCondition("Test", String.format("%s(_var)", method), "_var", "Test",
"1", minutesToWait);
var message = new CalculationControlMessage();
message.setAction(ControlMessageAction.Create);
message.setCalculationDefinition(definition);
harness.processBroadcastElement(message, 100l);
//harness.processBroadcastWatermark(5l);
this.processValues(harness, values);
// advance the watermark so that the timer can fire
harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);
assertEquals(harness.numEventTimeTimers(), 1);
assertEquals("there should be a formular evaluated", 1, harness.extractOutputValues().size());
harness.extractOutputValues().forEach(datapoint -> {
assertEquals(datapoint.getValue(), expected, 0d);
});
}
For my understanding the watermark is manually advanced so that the timer can trigger. The watermark if definitely higher than the watermark from the event.
The harness is setup like this
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.util.KeyedBroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
public class FlinkHarness {
public static KeyedBroadcastOperatorTestHarness<String, DataPointEvent, CalculationControlMessage, DataPointEvent> createForCalculations()
throws Exception {
var dynamicCalculationFunction = new DynamicCalculationFunction();
var harness = ProcessFunctionTestHarnesses
.forKeyedBroadcastProcessFunction(dynamicCalculationFunction, new KeySelector<>() {
private static final long serialVersionUID = 1337L;
@Override
public String getKey(DataPointEvent value) throws Exception {
return value.getDataPointKey();
}
}, Types.STRING, CalculationDescriptors.calculation);
harness.setTimeCharacteristic(TimeCharacteristic.EventTime);
harness.getExecutionConfig().setAutoWatermarkInterval(50l);
harness.open();
return harness;
}
If I debug the function I can see that the timerService used by the test the currentWatermark
is still at -9223372036854775808
I don't understand why the timer does not trigger. Am I missing something?
For me it was unclear that on the testharness there are two methods to advance the watermark on an operator with multiple input streams.
Yes, that makes sense. The overall watermark for an operator with multiple input channels is always the minimum of all incoming watermarks. So you must advance both watermarks
In that case with a KeyedBroadcastProcessFunction
within the test you always need to advance all incoming watermarks
So when calling harness.processBroadcastElement(message, 0);
or harness.processElement(new StreamRecord<DataPointEvent>(event, event.getTimestamp()));
both watermarks need to advanced with
harness.processBroadcastWatermark(timestamp);
harness.processWatermark(timestamp);
Then the desired timer can fire