Search code examples
scalaserializationapache-flinkflink-streaming

How to debug Flink Task Issues


Hi I am trying to run a flink scala application which reads from kafka apply some lookup transformations and then writes to kafka.

Flink Version 1.12.1

I tested it in local and it works fine. But when I try to run it on cluster using native kubernetes integration I see weird errors like below.

The cluster also looks fine, because I tried to run a wordcount app on the cluster and it worked fine.

The exception is not clear and also the stacktrace shows the taskmanager stack trace and hence no idea where in the application the problem could be. Could this be a serialization issue? Is there a way to debug such issues and find the actual point in application code where there is a problem?

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.IOException: unexpected exception type
        at java.io.ObjectStreamClass.throwMiscException(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more
Caused by: java.lang.ClassNotFoundException: __wrapper$1$7aa8fcbe22114421a688e120fcde1df7.__wrapper$1$7aa8fcbe22114421a688e120fcde1df7$
        at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Unknown Source) ~[?:?]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile$13(ToolBoxFactory.scala:433) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more```

Solution

  • My guess was the problem was with order of classloading.

    I was trying to deploying in application mode. Before I had my user code jar and dependent libs in /opt/flink/usrlib. I tried to follow this guide https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode and used /usrlib to put it in the jars. I was seeing the error above.

    Now I changed to /opt/flink/lib and user code and dependencies with flink libs are in the same dir inside the docker image.

    Changed COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/usrlib/poc-flink.jar to COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/lib/poc-flink.jar So the same classloader is used. And it solved the problem.