Search code examples
apache-kafkaapache-flinkkafka-consumer-api

Flink Kafka Source Operator Reconnect Issue


I am new to flink, I have a flink pipeline which has 5 Operators, out of which 2 of them are kafka Source Operators consuming kafka messages from 2 different topics. I have checkpointing enabled for every minute on both these sources. I am trying to test a usecase where I bring down kafka and bring back up after 10 mins.
My flink version is 1.16.1 and same kafka-connector version. The task failure config settings are default not touched, similar with kafka consumer config is default.

To bring down the kafka pods, I scaled them down to 0 and scaled them back to original after 10 mins.

Expected behaviour: The operator should reconnect automatically once the kafka is up.

Actual Behaviour: The operator is not reconnecting neither task is failing nor the job status is changed. It is stuck in some loop and it only comes back up once I delete both jm and tm pods.

Part of the Logs After the kafka is up and running:

org.apache.kafka.common.errors.TimeoutException: Failed to send request after 30000 ms.
2023-08-29 07:32:01.945 [Source Data Fetcher for Source: Kafka Data source (3/3)#0] INFO  org.apache.kafka.clients.NetworkClient  - [Consumer clientId=data-group-2, groupId=data-group] Disconnecting from node 1 due to socket connection setup timeout. The timeout value is 30096 ms.
2023-08-29 07:32:01.945 [Source Data Fetcher for Source: Kafka Data source (3/3)#0] INFO  org.apache.kafka.clients.FetchSessionHandler  - [Consumer clientId=data-group-2, groupId=data-group] Error sending fetch request (sessionId=1361712539, epoch=INITIAL) to node 1:
org.apache.kafka.common.errors.DisconnectException: null

Solution

  • I looked into the issue further, There could be many probable causes to this problem but In my case the issue was kafka client resolved IPs of the kafka brokers at the beginning and caches the information. Since all brokers restart, their IPs are changed but kafka client is still refering to older IPs, thus getting stuck.
    For more detail you can refer to KAFKA-7931