A jobmanager and taskmanager are running on a single VM. Also Kafka runs on the same server.
I have 10 tasks, all read from different kafka topics , process messages and write back to Kafka. Sometimes I find my task manager is down and nothing is working. I tried to figure out the problem by checking the logs and I believe it is a problem with Kafka connection. (Or maybe a network problem?. But everything is on a single server.)
What I want to ask is, if for a short period I lose connection to Kafka what happens. Why tasks are failing and most importantly why task manager crushes?
Some logs:
2022-11-26 23:35:15,626 INFO org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-15] Disconnecting from node 0 due to request timeout.
2022-11-26 23:35:15,626 INFO org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-8] Disconnecting from node 0 due to request timeout.
2022-11-26 23:35:15,626 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=cpualgosgroup1-1, groupId=cpualgosgroup1] Disconnecting from node 0 due to request timeout.
2022-11-26 23:35:15,692 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=telefilter1-0, groupId=telefilter1] Cancelled in-flight FETCH request with correlation id 3630156 due to node 0 being disconnected (elapsed time since creation: 61648ms, elapsed time since send: 61648ms, request timeout: 30000ms)
2022-11-26 23:35:15,702 INFO org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-15] Cancelled in-flight PRODUCE request with correlation id 2159429 due to node 0 being disconnected (elapsed time since creation: 51069ms, elapsed time since send: 51069ms, request timeout: 30000ms)
2022-11-26 23:35:15,702 INFO org.apache.kafka.clients.NetworkClient [] - [Consumer clientId=cpualgosgroup1-1, groupId=cpualgosgroup1] Cancelled in-flight FETCH request with correlation id 2344708 due to node 0 being disconnected (elapsed time since creation: 51184ms, elapsed time since send: 51184ms, request timeout: 30000ms)
2022-11-26 23:35:15,702 INFO org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-15] Cancelled in-flight PRODUCE request with correlation id 2159430 due to node 0 being disconnected (elapsed time since creation: 51069ms, elapsed time since send: 51069ms, request timeout: 30000ms)
2022-11-26 23:35:15,842 WARN org.apache.kafka.clients.producer.internals.Sender [] - [Producer clientId=producer-15] Received invalid metadata error in produce request on partition tele.alerts.cpu-4 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
2022-11-26 23:35:15,842 WARN org.apache.kafka.clients.producer.internals.Sender [] - [Producer clientId=producer-8] Received invalid metadata error in produce request on partition tele.alerts.cpu-6 due to org.apache.kafka.common.errors.NetworkException: Disconnected from node 0. Going to request metadata update now
2
and then
2022-11-26 23:35:56,673 WARN org.apache.flink.runtime.taskmanager.Task [] - CPUTemperatureAnalysisAlgorithm -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_d0ae1ab03e621ff140fb6b0b0a2932f9_0_0) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkException: Disconnect from JobManager responsible for 8d57994a59ab86ea9ee48076e80a7c7f.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1702)
...
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
Caused by: java.util.concurrent.TimeoutException: The heartbeat of JobManager with id 99d52303d7e24496ae661ddea2b6a372 timed out.
2022-11-26 23:35:56,682 INFO org.apache.flink.runtime.taskmanager.Task [] - Triggering cancellation of task code CPUTemperatureAnalysisAlgorithm -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_d0ae1ab03e621ff140fb6b0b0a2932f9_0_0).
2022-11-26 23:35:57,199 INFO org.apache.flink.runtime.taskmanager.Task [] - Attempting to fail task externally TemperatureAnalysis -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_15071110d0eea9f1c7f3d75503ff58eb_0_0).
2022-11-26 23:35:57,202 WARN org.apache.flink.runtime.taskmanager.Task [] - TemperatureAnalysis -> Sink: Writer -> Sink: Committer (1/1)#0 (619139347a459b6de22089ff34edff39_15071110d0eea9f1c7f3d75503ff58eb_0_0) switched from RUNNING to FAILED with failure cause: org.apache.flink.util.FlinkException: Disconnect from JobManager responsible for 8d57994a59ab86ea9ee48076e80a7c7f.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.disconnectJobManagerConnection(TaskExecutor.java:1702)
Why taskexecutor loses connection to JobManager?
If I dont care any data lost, how should I configure Kafka clients and flink recovery. I just want Kafka Client not to die. Especially I dont want my tasks or task managers to crush. If I lose connection, is it possible to configure Flink to just for wait? If we can`t read, wait and if we can't write back to Kafka, just wait?
The heartbeat of JobManager with id 99d52303d7e24496ae661ddea2b6a372 timed out.
Sounds like the server is somewhat overloaded. But you could try increasing the heartbeat timeout.