I'm using Flink 1.15.1 and JUnit5. I would like to know how the following integration test, adapted from the documentation, works without the inclusion of the flink-test-utils
dependency and the MiniClusterWithClientResource
static instance.
package com.mypackage;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.junit.jupiter.api.Test;
public class ExampleIntegrationTest {
@Test
public void testIncrementPipeline() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure your test environment
env.setParallelism(2);
// values are collected in a static variable
CollectSink.values.clear();
// create a stream of custom elements and apply transformations
env.fromElements(1L, 21L, 22L).map(n -> n + 1).addSink(new CollectSink());
// execute
env.execute();
// verify your results
assertTrue(CollectSink.values.containsAll(List.of(2L, 22L, 23L)));
}
// create a testing sink
private static class CollectSink implements SinkFunction<Long> {
// must be static
public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
@Override
public void invoke(Long value, SinkFunction.Context context) throws Exception {
values.add(value);
}
}
}
Am I missing something crucial by not including the flink-test-utils
dependency and the MiniClusterWithClientResource
static instance that make my integration tests incorrect? The documentation is specific that those are required.
Without the @ClassRule
shown in the documentation, you'll be spinning up a new Flink mini-cluster for every test (configured with the defaults) rather than reusing the same cluster (with its specific configuration) for all of the tests in that test class.
For a more up-to-date JUnit5 setup, you could look at how the tests are organized in the Immerok Apache Flink cookbook, i.e., FlinkMiniClusterExtension which looks like this:
package com.immerok.cookbook.extensions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A wrapper around a {@link MiniClusterWithClientResource} that allows it to be used as a JUnit5
* Extension.
*/
public class FlinkMiniClusterExtension implements BeforeAllCallback, AfterAllCallback {
private static final Logger LOG = LoggerFactory.getLogger(FlinkMiniClusterExtension.class);
private static final int PARALLELISM = 2;
private static MiniClusterWithClientResource flinkCluster;
@Override
public void beforeAll(ExtensionContext context) throws Exception {
flinkCluster =
new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(PARALLELISM)
.setNumberTaskManagers(1)
.build());
flinkCluster.before();
LOG.info("Web UI is available at {}", flinkCluster.getRestAddres());
}
@Override
public void afterAll(ExtensionContext context) {
flinkCluster.after();
}
}
and it gets used like this:
@ExtendWith(FlinkMiniClusterExtension.class)
class MyTests ...