Search code examples
javajunitapache-flinkflink-streaming

How to unit test a Flink ProcessFunction?


I have a simple ProcessFunction that takes in String as input and gives a String as output. How do I unit test this using Junit? As the processElement method is a void method and returns no value.

public class SampleProcessFunction extends ProcessFunction<String, String>{
    @Override
    public void processElement(String content, Context context, Collector<String> collector) throws Exception {
        String output = content + "output";
        collector.collect(output);
    }
}

Solution

  • In order to unit test this method, define the expected behavior. In this case, the expected behavior is a single invocation of Collector::collect method with content + "output" as an argument. Thereby, this could be tested using mocked collector.

    Here is an example using Mockito framework:

    ...
    
    private final Collector<String> collectorMock = Mockito.mock(Collector.class);
    private final Context contextMock = Mockito.mock(Context.class);
    
    private final SampleProcessFunction sampleProcessFunction = new SampleProcessFunction();
    
    @Test
    public void testProcessElement_shouldInvokeCollector_whenAnyValuePassed() throws Exception {
        // given
        final String content = "hello ";
    
        // when
        sampleProcessFunction.processElement(content, contextMock, collectorMock);
    
        // then
        Mockito.verify(collectorMock).collect(content + "output"); // verifies that collector method was invoked with "hello output" exactly once
    }
    
    ...