Unit Tests Failing when upgrading Flink to 1.18

I am in the process of updating my application from Flink 1.15.2 to 1.20, and ran into a snag along the way. I have a simple unit test that creates a stream of data (Long values), applies a map function to increment the value by 1, and sinks it to a list of Longs. I copied this unit test from the Flink documentation, and tried testing it incrementally. Originally in 1.15.2, next 1.16.2 etc and found that it is breaking in Flink 1.18.1

I am attempting to build using Scala 2.12 through sbt, tests are run with the "sbt test" command

class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {

  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()

  before {

  after {

  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // configure your test environment

    // values are collected in a static variable

    // create a stream of custom elements and apply transformations
    env.fromElements(1, 21, 22)
      .map(x => x + 1L)
      .addSink(new CollectSink())

    // execute

    // verify your results
    CollectSink.values should contain allOf(2, 22, 23)

class CollectSink extends SinkFunction[Long] {
  override def invoke(value: Long, context: SinkFunction.Context): Unit = {

object CollectSink {
  // must be static
  val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())

the error message that i'm receiving:

should incrementValues *** FAILED *** 
[info] org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
[info] at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult( 
[info] at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3( 
[info] at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire( 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete( 
[info] at java.base/java.util.concurrent.CompletableFuture.complete( 
[info] at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1( 
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete( 
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire( 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete( 
[info] at java.base/java.util.concurrent.CompletableFuture.complete( 
[info] ... 
[info] Cause: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. 
[info] at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0( 
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete( 
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire( 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete( 
[info] at java.base/java.util.concurrent.CompletableFuture$ 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$ 
[info] at java.base/ 
[info] ... 
[info] Cause: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at java.base/java.util.concurrent.CompletableFuture.encodeThrowable( 
[info] at java.base/java.util.concurrent.CompletableFuture.completeThrowable( 
[info] at java.base/java.util.concurrent.CompletableFuture$ 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$ 
[info] at java.base/ 
[info] ... 
[info] Cause: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at org.apache.flink.util.ExceptionUtils.rethrow( 
[info] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4( 
[info] at java.base/java.util.concurrent.CompletableFuture$ 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker( 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$ 
[info] at java.base/ 
[info] ... 
[info] Cause: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass( 
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass( 
[info] at java.base/java.lang.ClassLoader.loadClass( 
[info] at java.base/java.lang.Class.forName0(Native Method) 
[info] at java.base/java.lang.Class.forName( 
[info] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass( 
[info] at java.base/ 
[info] at java.base/ 
[info] at java.base/ 
[info] at java.base/

Based on the research that i've done, ClassNotFoundException errors are typically related to mismatch versions. I've simplified my dependencies in my build.sbt file:

val flinkVersion = "1.18.1"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink"  % "flink-test-utils"      % flinkVersion % Test, 
  "org.scalatest"    %% "scalatest"             % "3.2.19"     % Test,
  "com.typesafe"      % "config"                % "1.4.3"

Also, while looking through the release notes, it was mentioned that 1.18 was tested using Java 17. I have tried running tests in a Java 11 and Java 17 environment, both are receiving the same error.

Additional information here is the tutorial from apache that i'm following


  • From mail-archive - Getting java.lang.ClassNotFoundException in Tests (Flink 1.18.0) - 03 Dec 2023

    Forking in sbt solved the issue (Test / fork := true).

    So, adding that line to your build.sbt config file solves the problem.

    I was able to reproduce the problem locally using the code you provided. Once I added that line, the test succeeded. Also works for 1.19.1 and 1.20.0.

