Search code examples
apache-flinkflink-streaming

How is Flink integration testing working without the documented flink-test-utils dependency or MiniClusterWithClientResource?


I'm using Flink 1.15.1 and JUnit5. I would like to know how the following integration test, adapted from the documentation, works without the inclusion of the flink-test-utils dependency and the MiniClusterWithClientResource static instance.

package com.mypackage;

import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.jupiter.api.Test;

public class ExampleIntegrationTest {

  @Test
  public void testIncrementPipeline() throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // configure your test environment
    env.setParallelism(2);

    // values are collected in a static variable
    CollectSink.values.clear();

    // create a stream of custom elements and apply transformations
    env.fromElements(1L, 21L, 22L).map(n -> n + 1).addSink(new CollectSink());

    // execute
    env.execute();

    // verify your results
    assertTrue(CollectSink.values.containsAll(List.of(2L, 22L, 23L)));
  }

  // create a testing sink
  private static class CollectSink implements SinkFunction<Long> {

    // must be static
    public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());

    @Override
    public void invoke(Long value, SinkFunction.Context context) throws Exception {
      values.add(value);
    }
  }
}

Am I missing something crucial by not including the flink-test-utils dependency and the MiniClusterWithClientResource static instance that make my integration tests incorrect? The documentation is specific that those are required.


Solution

  • Without the @ClassRule shown in the documentation, you'll be spinning up a new Flink mini-cluster for every test (configured with the defaults) rather than reusing the same cluster (with its specific configuration) for all of the tests in that test class.

    For a more up-to-date JUnit5 setup, you could look at how the tests are organized in the Immerok Apache Flink cookbook, i.e., FlinkMiniClusterExtension which looks like this:

    package com.immerok.cookbook.extensions;
    
    import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
    import org.apache.flink.test.util.MiniClusterWithClientResource;
    import org.junit.jupiter.api.extension.AfterAllCallback;
    import org.junit.jupiter.api.extension.BeforeAllCallback;
    import org.junit.jupiter.api.extension.ExtensionContext;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * A wrapper around a {@link MiniClusterWithClientResource} that allows it to be used as a JUnit5
     * Extension.
     */
    public class FlinkMiniClusterExtension implements BeforeAllCallback, AfterAllCallback {
    
        private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniClusterExtension.class);
    
        private static final int PARALLELISM = 2;
        private static MiniClusterWithClientResource flinkCluster;
    
        @Override
        public void beforeAll(ExtensionContext context) throws Exception {
            flinkCluster =
                    new MiniClusterWithClientResource(
                            new MiniClusterResourceConfiguration.Builder()
                                    .setNumberSlotsPerTaskManager(PARALLELISM)
                                    .setNumberTaskManagers(1)
                                    .build());
    
            flinkCluster.before();
    
            LOG.info("Web UI is available at {}", flinkCluster.getRestAddres());
        }
    
        @Override
        public void afterAll(ExtensionContext context) {
            flinkCluster.after();
        }
    }
    

    and it gets used like this:

    @ExtendWith(FlinkMiniClusterExtension.class)
    class MyTests ...