Search code examples
apache-flinklz4

FlinkKafkaConsumer fails to read from a LZ4 compressed topic


We've got several flink applications reading from Kafka topics, and they work fine. But recently we've added a new topic to the existing flink job and it started failing immediately on startup with the following root error:

Caused by: org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: net/jpountz/lz4/LZ4Exception
    at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
    at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:256)
    at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:334)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1208)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1245)
    ... 7 more

I found out that this topic has the lz4 compression and guess that flink for some reason is unable to work with it. Adding lz4 dependencies directly to the app didn't work, and what's weird - it runs fine locally, but fails on the remote cluster.

The flink runtime version is 1.9.1, and we have the same version of all other dependencies in our application: flink-streaming-java_2.11, flink-connector-kafka_2.11, flink-java and flink-clients_2.11

Could this be happening due to flink not having a dependency to the lz4 lib inside?


Solution

  • Found the solution. No version upgrade was needed, nor the additional dependencies to the application itself. What worked out for us is adding the lz4 library jar directly to the flink libs folder in the Docker image. After that, the error with lz4 compression disappeared.