I'm writing unit tests for a CoProcessFunction
. Is there a way to drive processing time forward manually in tests to trigger onTimer
calls?
Flink provides test harnesses to test functions with timers and state. The allow you to "control" the time and also validate properties of the state.
The following code block is copied from the official documentation to give you an idea how the harnesses are used.
public class StatefulFlatMapTest {
private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
private StatefulFlatMap statefulFlatMapFunction;
@Before
public void setupTestHarness() throws Exception {
//instantiate user-defined function
statefulFlatMapFunction = new StatefulFlatMapFunction();
// wrap user defined function into a the corresponding operator
testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));
// optionally configured the execution environment
testHarness.getExecutionConfig().setAutoWatermarkInterval(50);
// open the test harness (will also call open() on RichFunctions)
testHarness.open();
}
@Test
public void testingStatefulFlatMapFunction() throws Exception {
//push (timestamped) elements into the operator (and hence user defined function)
testHarness.processElement(2L, 100L);
//trigger event time timers by advancing the event time of the operator with a watermark
testHarness.processWatermark(100L);
//trigger processing time timers by advancing the processing time of the operator directly
testHarness.setProcessingTime(100L);
//retrieve list of emitted records for assertions
assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));
//retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
//assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
}
}
Please check the docs for details about which dependencies to include, which harnesses exist, and how to use them.