Search code examples
apache-flinkflink-streaming

Flink Streaming - kafka.internals.Handover$ClosedException


I am using Apache Flink v1.12.3.

Recently I have encountered this error, and I do not know what exactly it's means. Is error related to the Kafka or Flink?

Error log:

2021-07-02 21:32:50,149 WARN  org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-myGroup-24, groupId=myGroup] Offset commit failed on partition my_topic-14 at offset 11328862986: The request timed out.
// ...
2021-07-02 21:32:50,150 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-myGroup-24, groupId=myGroup] Group coordinator 1.2.3.4:9092 (id: 2147483616 rack: null) is unavailable or invalid, will attempt rediscovery

// ...

2021-07-02 21:33:20,553 INFO  org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=consumer-myGroup-21, groupId=myGroup] Error sending fetch request (sessionId=1923351260, epoch=9902) to node 29: {}.

// ...

2021-07-02 21:33:19,457 INFO  org.apache.kafka.clients.FetchSessionHandler                 [] - [Consumer clientId=consumer-myGroup-15, groupId=myGroup] Error sending fetch request (sessionId=1427185435, epoch=10820) to node 29: {}.
org.apache.kafka.common.errors.DisconnectException: null

// ...

2021-07-02 21:34:10,157 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: my_topic_stream (4/15)#0 (2e2051d41edd606a093625783d844ba1) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.internals.Handover$ClosedException: null
    at org.apache.flink.streaming.connectors.kafka.internals.Handover.close(Handover.java:177) ~[blob_p-a7919582483974414f9c0d4744bab53199b880d7-d9edc9d0741b403b3931269bf42a4f6b:?]
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.cancel(KafkaFetcher.java:164) ~[blob_p-a7919582483974414f9c0d4744bab53199b880d7-d9edc9d0741b403b3931269bf42a4f6b:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.cancel(FlinkKafkaConsumerBase.java:945) ~[blob_p-a7919582483974414f9c0d4744bab53199b880d7-d9edc9d0741b403b3931269bf42a4f6b:?]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.lambda$createAndStartDiscoveryLoop$2(FlinkKafkaConsumerBase.java:913) ~[blob_p-a7919582483974414f9c0d4744bab53199b880d7-d9edc9d0741b403b3931269bf42a4f6b:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_212]

Solution

  • It is Kafka's issue. Kafka consumer client throws an error(timeout) while committing the offset to Kafka cluster. One possible reason is the Kafka cluster is busy and cannot response in time. This error makes the task manager running Kafka consumer failed.

    Try to add parameters to properties while creating the source stream from Kafka. Possible parameter is: request.timeout.ms, set it to a longer time and then have a try.

    References: