I am trying to upgrade flink 1.7.2 to flink 1.10 and I am having problem with cassandra connector. Everytime I start a job that is using it the following exception is thrown:
com.datastax.driver.core.exceptions.TransportException: [/xx.xx.xx.xx] Error writing
at com.datastax.driver.core.Connection$10.operationComplete(Connection.java:550)
at com.datastax.driver.core.Connection$10.operationComplete(Connection.java:534)
at com.datastax.shaded.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
at com.datastax.shaded.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:621)
at com.datastax.shaded.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:138)
at com.datastax.shaded.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93)
at com.datastax.shaded.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28)
at com.datastax.driver.core.Connection$Flusher.run(Connection.java:870)
at com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
at com.datastax.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
at com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.shaded.netty.handler.codec.EncoderException: java.lang.OutOfMemoryError: Direct buffer memory
at com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)
Also the following message was printed when the job was run locally (not in YARN):
13:57:54,490 ERROR com.datastax.shaded.netty.util.ResourceLeakDetector - LEAK: You are creating too many HashedWheelTimer instances. HashedWheelTimer is a shared resource that must be reused across the JVM,so that only a few instances are created.
All jobs that do not use cassandra connector are working properly Can someone help?
UPDATE: The bug is still reproducible and I think this is the reason: https://issues.apache.org/jira/browse/FLINK-17493.
I had an old configuration (from flink 1.7) where classloader.parent-first-patterns.additional: com.datastax.
was configured and my cassadndra-flink connector was in flink/lib folder ( this was done because of other problems related to shaded netty I had with Cassandra-flink connector). Now with the migration to flink 1.10 the following problem was hit. Once removing this configuration - classloader.parent-first-patterns.additional: com.datastax.
, including flink-connector-cassandra_2.12-1.10.0.jar
in my jar and removing it from /usr/lib/flink/lib/
the problem was no longer reproducible.