Search code examples
cassandraapache-flinkflink-streaming

Flink-Cassandra connector throws exception (flink-connector-cassandra_2.11-1.10.0)


I am trying to upgrade flink 1.7.2 to flink 1.10 and I am having problem with cassandra connector. Everytime I start a job that is using it the following exception is thrown:

com.datastax.driver.core.exceptions.TransportException: [/xx.xx.xx.xx] Error writing
    at com.datastax.driver.core.Connection$10.operationComplete(Connection.java:550)
    at com.datastax.driver.core.Connection$10.operationComplete(Connection.java:534)
    at com.datastax.shaded.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
    at com.datastax.shaded.netty.util.concurrent.DefaultPromise.notifyLateListener(DefaultPromise.java:621)
    at com.datastax.shaded.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:138)
    at com.datastax.shaded.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:93)
    at com.datastax.shaded.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:28)
    at com.datastax.driver.core.Connection$Flusher.run(Connection.java:870)
    at com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
    at com.datastax.shaded.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at com.datastax.shaded.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.datastax.shaded.netty.handler.codec.EncoderException: java.lang.OutOfMemoryError: Direct buffer memory
        at com.datastax.shaded.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:107)
        at com.datastax.shaded.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:643)

Also the following message was printed when the job was run locally (not in YARN):

13:57:54,490 ERROR com.datastax.shaded.netty.util.ResourceLeakDetector           - LEAK: You are creating too many HashedWheelTimer instances.  HashedWheelTimer is a shared resource that must be reused across the JVM,so that only a few instances are created.

All jobs that do not use cassandra connector are working properly Can someone help?


Solution

  • UPDATE: The bug is still reproducible and I think this is the reason: https://issues.apache.org/jira/browse/FLINK-17493.

    I had an old configuration (from flink 1.7) where classloader.parent-first-patterns.additional: com.datastax. was configured and my cassadndra-flink connector was in flink/lib folder ( this was done because of other problems related to shaded netty I had with Cassandra-flink connector). Now with the migration to flink 1.10 the following problem was hit. Once removing this configuration - classloader.parent-first-patterns.additional: com.datastax., including flink-connector-cassandra_2.12-1.10.0.jar in my jar and removing it from /usr/lib/flink/lib/ the problem was no longer reproducible.