Search code examples
scalaapache-flinkflink-streaming

FlinkKafkaConsumer011 Not Found On Flink Cluster


I am trying to run a Flink job on a cluster. This job runs fine on my development (local) environment. But when I deploy it on the cluster using the command:

./bin/flink run -c org.example.CointegrationOfPairs ../coint.jar

It fails with the error:

java.lang.NoClassDefFoundError: org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011
    at org.example.CointegrationOfPairs$.main(CointegrationOfPairs.scala:38)
    at org.example.CointegrationOfPairs.main(CointegrationOfPairs.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
    at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
    at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I have added the required dependencies too

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-ml" % flinkVersion % "provided"
)

I am building the jar file using sbt clean assembly


Solution

  • Connectors are not included in Flink's binary distribution to avoid version conflicts of their dependencies and user code. Hence, the corresponding classes are not loaded by default into the classpath of the Flink processes.

    There are two ways to fix this problem:

    1. Do not set the flink-connnector-kafka dependency as provided. Instead, build a fat jar that contains the connector dependency. That way, the connector is shipped together with your application. This is the preferred approach.

    2. Add the jar file of the flink-connector-kafka dependency to the ./lib folder of your Flink setup. This will distribute the file and include it in the classpath of the Flink processes.