I have an Apache Flink job written in Scala. I upgraded Flink from 1.16.1 to 1.20.0 and Java from 11 to 17. I am getting the below error upon the execution of a unit test which worked fine before the upgrade:
org.apache.flink.util.FlinkException: Failed to execute job 'my-flink-job'.
[info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2487)
[info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2351)
[info] at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:68)
[info] at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:2325)
[info] at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:917)
[info] at com.my.domain.MySpec.$anonfun$new$3(MySpec.scala:40)
[info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
[info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
[info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info] at org.scalatest.Transformer.apply(Transformer.scala:22)
[info] ...
[info] Cause: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
[info] at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
[info] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
[info] at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
[info] at java.base/java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:483)
[info] at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
[info] at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1193)
[info] at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1666)
[info] at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1633)
[info] at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
[info] ...
[info] Cause: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
[info] at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
[info] at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
[info] at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
[info] at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
[info] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
[info] at org.apache.flink.util.MdcUtils.lambda$wrapRunnable$1(MdcUtils.java:64)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
[info] at java.base/java.lang.Class.forName0(Native Method)
[info] at java.base/java.lang.Class.forName(Class.java:467)
[info] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
[info] at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034)
[info] at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
[info] at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224)
[info] at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
I've updated the flink-kafka-connector to 3.4.0-1.20 and ensured that org.apache.flink.flink-core (this jar contains ExecutionConfig.java) version is 1.20.0 and is present in the classpath.
I've verified from the dependencyTree that all org.apache.flink:flink-core dependencies point to the same 1.20.0 version.
I get the above error when test run settings have
fork in Test := false
when set to true
it works fine
The runtime classpath in sbt contains
flink-core-1.20.0-tests.jar
flink-core-1.20.0.jar
flink-core-api-1.20.0.jar
Here too are no conflicting versions of the dependencies present.
Note: Flink has deprecated the support for Scala DataStream API, so I'm using Java DataStream API now.
Edit:
I wish to run the unit tests without forking a new JVM, as I am using sbt-jacoco to generate test coverage reports, and forking a new JVM messes up the directory structure from where these reports are written and read.
Work around: