Search code examples
javaapache-flink

Flink event time timer does not fire in testharness


I'm trying to implement a tumbling window based calculation on event time characteristics with flink.

Therefore I have a KeyedBroadcastProcessFunction doing all the work. With ProcessTimers everything works as expected in a unit test. Now I changed the code to work with Event Timers but the times do not trigger. (I do register the timers with registerEventTimeTimer)

Basically the test is like this

  @Test
  public void evaluateFormular_ShouldSumOnTimer() throws Exception {
    long minutesToWait = 1;
    var definition = createTestCondition("Test", String.format("%s(_var)", method), "_var", "Test",
        "1", minutesToWait);

    var message = new CalculationControlMessage();
    message.setAction(ControlMessageAction.Create);
    message.setCalculationDefinition(definition);

    harness.processBroadcastElement(message, 100l);
    //harness.processBroadcastWatermark(5l);
    this.processValues(harness, values);
    // advance the watermark so that the timer can fire
    harness.processWatermark(Time.minutes(minutesToWait).toMilliseconds() + 1);

    assertEquals(harness.numEventTimeTimers(), 1);
    assertEquals("there should be a formular evaluated", 1, harness.extractOutputValues().size());
    harness.extractOutputValues().forEach(datapoint -> {
      assertEquals(datapoint.getValue(), expected, 0d);
    });
  }

For my understanding the watermark is manually advanced so that the timer can trigger. The watermark if definitely higher than the watermark from the event.

The harness is setup like this

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.util.KeyedBroadcastOperatorTestHarness;
import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;

public class FlinkHarness {
  public static KeyedBroadcastOperatorTestHarness<String, DataPointEvent, CalculationControlMessage, DataPointEvent> createForCalculations()
      throws Exception {
    var dynamicCalculationFunction = new DynamicCalculationFunction();
    
    var harness = ProcessFunctionTestHarnesses
        .forKeyedBroadcastProcessFunction(dynamicCalculationFunction, new KeySelector<>() {
          private static final long serialVersionUID = 1337L;

          @Override
          public String getKey(DataPointEvent value) throws Exception {
            return value.getDataPointKey();
          }

        }, Types.STRING, CalculationDescriptors.calculation);

    harness.setTimeCharacteristic(TimeCharacteristic.EventTime);
    harness.getExecutionConfig().setAutoWatermarkInterval(50l);
    harness.open();
    return harness;
  }

If I debug the function I can see that the timerService used by the test the currentWatermark is still at -9223372036854775808

I don't understand why the timer does not trigger. Am I missing something?


Solution

  • For me it was unclear that on the testharness there are two methods to advance the watermark on an operator with multiple input streams.

    Yes, that makes sense. The overall watermark for an operator with multiple input channels is always the minimum of all incoming watermarks. So you must advance both watermarks

    In that case with a KeyedBroadcastProcessFunction within the test you always need to advance all incoming watermarks

    So when calling harness.processBroadcastElement(message, 0); or harness.processElement(new StreamRecord<DataPointEvent>(event, event.getTimestamp())); both watermarks need to advanced with

    harness.processBroadcastWatermark(timestamp);
    harness.processWatermark(timestamp);
    

    Then the desired timer can fire