Search code examples
javaunit-testingapache-flinkflink-streaming

Integration test for complex topology (multiple inputs) in Flink


I need to write unit test for a flink streaming topology. It's basically a CoFlatMapFunction, and it has 2 inputs.

I try to get some inspiration from this page: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

The order of the inputs matter to my topology, so when I test, I can't use StreamExecutionEnvironment#fromCollection for each input, as I won't control the order in which data points are injected in each input.

I've tried to create a single input using StreamExecutionEnvironment#fromCollection and dispatch each element to the actual input of my CoFlatMapFunction based on their type, but the order of elements is lost in this operation.

Is there another way to write this test?


Solution

  • The flink training exercises have an example of using a TwoInputStreamOperatorTestHarness that you can refer to:

    https://github.com/dataArtisans/flink-training-exercises/blob/master/src/test/java/com/dataartisans/flinktraining/exercises/datastream_java/process/EventTimeJoinTest.java

    You'll need these dependencies:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils-junit</artifactId>
      <version>${flink.version}</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.11</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-all</artifactId>
      <version>1.10.19</version>
      <type>jar</type>
      <scope>test</scope>
    </dependency>
    
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-runtime_2.11</artifactId>
      <version>${flink.version}</version>
      <scope>test</scope>
      <type>test-jar</type>
    </dependency>
    

    You should keep in mind that this isn't a public, supported interface, so it could evolve in unexpected ways.