Search code examples
apache-kafkaapache-flinkflink-streaming

Losing connection to Kafka. What happens?


A jobmanager and taskmanager are running on a single VM. Also Kafka runs on the same server.

I have 10 tasks, all read from different kafka topics , process messages and write back to Kafka. Sometimes I find my task manager is down and nothing is working. I tried to figure out the problem by checking the logs and I believe it is a problem with Kafka connection. (Or maybe a network problem?. But everything is on a single server.)

What I want to ask is, if for a short period I lose connection to Kafka what happens. Why tasks are failing and most importantly why task manager crushes?

Some logs:

2022-11-26 23:35:15,626 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-15] Disconnecting from node 0 due to request timeout.
2022-11-26 23:35:15,626 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-8] Disconnecting from node 0 due to request timeout.
2022-11-26 23:35:15,626 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=cpualgosgroup1-1, groupId=cpualgosgroup1] Disconnecting from node 0 due to request timeout.
2022-11-26 23:35:15,692 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=telefilter1-0, groupId=telefilter1] Cancelled in-flight FETCH request with correlation id 3630156 due to node 0 being disconnected (elapsed time since creation: 61648ms, elapsed time since send: 61648ms, request timeout: 30000ms)
2022-11-26 23:35:15,702 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-15] Cancelled in-flight PRODUCE request with correlation id 2159429 due to node 0 being disconnected (elapsed time since creation: 51069ms, elapsed time since send: 51069ms, request timeout: 30000ms)
2022-11-26 23:35:15,702 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Consumer clientId=cpualgosgroup1-1, groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation id 2344708 due to node 0 being disconnected (elapsed time since creation: 51184ms, elapsed time since send: 51184ms, request timeout: 30000ms)
2022-11-26 23:35:15,702 INFO  org.apache.kafka.clients.NetworkClient                       [] - [Producer clientId=producer-15] Cancelled in-flight PRODUCE request with correlation id 2159430 due to node 0 being disconnected (elapsed time since creation: 51069ms, elapsed time since send: 51069ms, request timeout: 30000ms)
2022-11-26 23:35:15,842 WARN  org.apache.kafka.clients.producer.internals.Sender           [] - [Producer clientId=producer-15] Received invalid metadata error in produce request on partition tele.alerts.cpu-4 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
2022-11-26 23:35:15,842 WARN  org.apache.kafka.clients.producer.internals.Sender           [] - [Producer clientId=producer-8] Received invalid metadata error in produce request on partition tele.alerts.cpu-6 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
2

and then

2022-11-26 23:35:56,673 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - CPUTemperatureAnalysisAlgorithm -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_d0ae1ab03e621ff140fb6b0b0a2932f9_0_0) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkException: Disconnect from JobManager responsible for 8d57994a59ab86ea9ee48076e80a7c7f.
        at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1702)
        ...
        at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
        Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id 99d52303d7e24496ae661ddea2b6a372 timed out.

2022-11-26 23:35:56,682 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Triggering cancellation of task code CPUTemperatureAnalysisAlgorithm -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_d0ae1ab03e621ff140fb6b0b0a2932f9_0_0).
2022-11-26 23:35:57,199 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Attempting to fail task externally TemperatureAnalysis -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_15071110d0eea9f1c7f3d75503ff58eb_0_0).
2022-11-26 23:35:57,202 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - TemperatureAnalysis -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_15071110d0eea9f1c7f3d75503ff58eb_0_0) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkException: Disconnect from JobManager responsible for 8d57994a59ab86ea9ee48076e80a7c7f.
        at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1702)

Why taskexecutor loses connection to JobManager?

If I dont care any data lost, how should I configure Kafka clients and flink recovery. I just want Kafka Client not to die. Especially I dont want my tasks or task managers to crush. If I lose connection, is it possible to configure Flink to just for wait? If we can`t read, wait and if we can't write back to Kafka, just wait?


Solution

  • The heartbeat of JobManager with id 99d52303d7e24496ae661ddea2b6a372 timed out.

    Sounds like the server is somewhat overloaded. But you could try increasing the heartbeat timeout.