Search code examples
apache-kafkaapache-flinkflink-streaming

Flink KafkaSink takes long time to start


I'm having an annoying behavior with my Flink app when using a KafkaSink. If my app contains a Sink to Kafka (EXACTLY_ONCE delivery), it takes ages to start, if I remove the Kafka sink (leaving the others) or replace it with a print, then the app starts in just a few seconds. In the task manager logs, I see thousands of repeated rows like these:

2023-04-05 14:01:25,828 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14788 with epoch 9
2023-04-05 14:01:25,828 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:25,829 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:25,932 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15843 with epoch 8
2023-04-05 14:01:25,932 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:25,933 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,035 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16840 with epoch 7
2023-04-05 14:01:26,035 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,036 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,139 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14813 with epoch 6
2023-04-05 14:01:26,139 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,140 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,244 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16858 with epoch 5
2023-04-05 14:01:26,244 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,245 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,348 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14832 with epoch 4
2023-04-05 14:01:26,348 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,349 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,451 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15886 with epoch 3
2023-04-05 14:01:26,452 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,453 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:26,555 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16891 with epoch 2
2023-04-05 14:01:26,555 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,556 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,659 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14860 with epoch 1
2023-04-05 14:01:26,660 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,660 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,766 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15917 with epoch 0
2023-04-05 14:01:26,767 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,767 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:26,870 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14713 with epoch 23
2023-04-05 14:01:26,870 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,871 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:26,974 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15752 with epoch 22
2023-04-05 14:01:26,974 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:26,975 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16751 with epoch 21
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,077 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,180 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14724 with epoch 20
2023-04-05 14:01:27,180 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,181 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,284 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15768 with epoch 19
2023-04-05 14:01:27,284 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,285 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,387 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16765 with epoch 18
2023-04-05 14:01:27,387 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,388 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,492 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14737 with epoch 17
2023-04-05 14:01:27,492 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,493 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,596 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15785 with epoch 16
2023-04-05 14:01:27,596 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,599 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:27,702 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16782 with epoch 15
2023-04-05 14:01:27,702 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,703 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:27,815 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15798 with epoch 14
2023-04-05 14:01:27,815 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,816 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:27,919 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14759 with epoch 13
2023-04-05 14:01:27,919 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:27,920 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,022 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16803 with epoch 12
2023-04-05 14:01:28,023 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,024 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,126 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15820 with epoch 11
2023-04-05 14:01:28,126 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,127 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:28,230 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14782 with epoch 10
2023-04-05 14:01:28,230 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,231 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,333 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16825 with epoch 9
2023-04-05 14:01:28,333 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,334 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,436 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15844 with epoch 8
2023-04-05 14:01:28,436 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,437 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:28,542 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14806 with epoch 7
2023-04-05 14:01:28,543 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,544 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,646 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16851 with epoch 6
2023-04-05 14:01:28,646 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,647 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,749 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15869 with epoch 5
2023-04-05 14:01:28,749 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,751 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:28,853 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16868 with epoch 4
2023-04-05 14:01:28,853 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,854 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:28,956 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15887 with epoch 3
2023-04-05 14:01:28,956 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:28,957 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,060 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14846 with epoch 2
2023-04-05 14:01:29,060 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,061 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16900 with epoch 1
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,163 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,268 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15918 with epoch 0
2023-04-05 14:01:29,268 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,269 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,374 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16748 with epoch 22
2023-04-05 14:01:29,374 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,375 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,478 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14721 with epoch 21
2023-04-05 14:01:29,478 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,479 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15763 with epoch 20
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,582 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)
2023-04-05 14:01:29,684 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 16761 with epoch 19
2023-04-05 14:01:29,685 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,685 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-1.mykafkacluster.amazonaws.com:9096 (id: 1 rack: null)
2023-04-05 14:01:29,788 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 14734 with epoch 18
2023-04-05 14:01:29,788 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,790 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-2.mykafkacluster.amazonaws.com:9096 (id: 2 rack: null)
2023-04-05 14:01:29,895 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] ProducerId set to 15779 with epoch 17
2023-04-05 14:01:29,895 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Invoking InitProducerId for the first time in order to acquire a producer ID
2023-04-05 14:01:29,896 INFO  org.apache.kafka.clients.producer.internals.TransactionManager [] - [Producer clientId=cid, transactionalId=kafka-sink-0-1] Discovered transaction coordinator b-3.mykafkacluster.amazonaws.com:9096 (id: 3 rack: null)

repeated for 20 minutes or even more sometimes...

I tried removing the sink and the app starts in few seconds, but I'm losing the output. I also tried changing the Sink delivery guarantee to AT_LEAST_ONCE, it seems to be faster (~1 minute) but still a lot o those logs...

This is the Sink configuration (some props relative to authentication are hidden and loaded with the appConfiguration object)

KafkaSink deviceDaySink = KafkaSink.<DeviceDayTimeTuple>builder()
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(appConfiguration.getString("device-day")
                        .setValueSerializationSchema(new SerializeJsonDeviceDayTime())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
                .setKafkaProducerConfig(appConfiguration.getKafkaProps())
                .setProperty("transactional.id", "tid")
                .setProperty("client.id", "cid")
                .setProperty("transaction.timeout.ms", "30000")
                .build();

Edit for additional info (Apr 12): As for now we set checkpoint every 30s, the topic has already been written before by the same app but also if we re-create the topic the problem is the same. Through kafka's CLI script kafka-transactions.sh we notice a lot of Empty transaction opened by the flink app at startup:

kafka-sink-0-3394   3           18862       Empty               
kafka-sink-0-4122   3           21106       Empty               
kafka-sink-0-6148   3           21793       Empty               
kafka-sink-0-4366   3           21189       Empty               
kafka-sink-0-2584   3           18585       Empty               
kafka-sink-0-2340   3           18501       Empty 
kafka-sink-0-30     3           16458       Empty
kafka-sink-0-4816   3           21341       Empty 
kafka-sink-0-308    3           16553       Empty
kafka-sink-0-2872   3           18682       Empty 
kafka-sink-0-4654   3           21288       Empty 
kafka-sink-0-3600   3           18931       Empty               
kafka-sink-0-4898   3           21369       Empty 
kafka-sink-0-146    3           16498       Empty
kafka-sink-0-5626   3           21616       Empty               
kafka-sink-0-3844   3           21014       Empty  
kafka-sink-0-4492   3           21231       Empty 
kafka-sink-0-272    3           16541       Empty 
kafka-sink-0-5220   3           21478       Empty 
kafka-sink-0-5464   3           21561       Empty  
kafka-sink-0-3682   3           18958       Empty 

these are only part of all, and both the transactionalId and the producerID match the ones in the log files.


Solution

  • The problem is in how Flink manages Kafka transactions. With regular producers when it wants to use transaction it will initialize transaction with set transactionalId on which Kafka will respond with producentId and epoch (which is an iterator starting from 0).

    This is to ensure only latest producer can actually output data to Kafka. Any producer that has the same trasactionalId but with the lower epoch than latest assigned will be fenced and will not be allowed to produce transactin. This also means that there will be only one transaction for given producer (identified by transactionalId).

    Flink however does things a bit differently. For each checkpoint it will create a new transactionalId and new transaction with it. The name of that transaction will be <transactionalIdPrefix>-<subtask id>-<checkpoint id>. So it will be generating transactions like kafka-sink-0-1, kafka-sink-0-2, etc.

    The information about transactions in Kafka are persisted in the topic and stored in current Kafka transaction leader cache. Typically the topic for transactions has 1 week of retention. So with 1 minute checkpoints with single producer with single instance there will be over 10k transactions. Of course we expect all of them to be in Empty or CompleteCommited state.

    What Flink is doing on Kafka producer startup is to first ensure there are no transactions that are in progress. This is done in org.apache.flink.connector.kafka.sink.TransactionAborter#abortTransactionOfSubtask. What this code do is to naively construct transactionalId starting from <transactionalIdPrefix>-<subtask id>-<current checkpoint id> and iterate up till Kafka returns epoch equal to 0 as it means it is the first time Kafka is seeing such transactionalId. It will do initTransaction which makes Kafka flush previous transaction with the same transactionalId and leave it in Empty state.

    This is no problem for regular operation as one could expect current checkpoint id is the latest checkpoint. This will however not be the case if you start with previous checkpoint or you start with fresh state. In that case your current checkpoint id will be 1 so it will start from 1 and will go up to whatever is the last transaction ever created in this cluster.

    This will be done each Flink startup until checkpoint id passes this last transaction.

    Since there is no API that I know in Kafka that would allow you to remove specific transationalId. Aborting it will just leave it with CompleteAborted. Your options are:

    1. Whenever you want to start with fresh Flink state you should change transactionalIdPrefix. This will only work when you clean state, because if you have changed transactionalIdPrefix Flink knows what was previous one and will scan those transactions anyway.
    2. Clear Kafka transactional state topic and force leader change so it will refresh cache from topic.
    3. Remove just specific transactions from state and force leader change.

    PS. Also if you run multiple local instances of Flink against shared Kafka make sure everybody has unique transactionalIdPrefix.

    This scanning can also happen when you have failed checkpoints as it scans from last successful one.