When creating a Kafka producer with exactly-once semantics using the Kafka API, two properties have to be set: transactional.id
has to be set to a transactional id, and enable.idempotence
has to be set to true.
In Apache Flink, a FlinkKafkaProducer
can be configured with a parameter for the desired semantics of the producer, in particular with the value Semantics.EXACTLY_ONCE
for exactly once semantics.
Looking at the source code of the FlinkKafkaProducer
, transactional ids are automatically generated and maintained. However, I did not found any place where enable.idempotence
is set to true for the ProducerConfig
of the underlying KafkaProducer
.
Does the property enable.idempotence
have to be provided in the ProducerConfig
given to the FlinkKafkaProducer
, or is there something I have overseen?
No it doesn't need to be set as stated in the Kafka documentation:
If the transactional.id is set, idempotence is automatically enabled along with the producer configs which idempotence depends on.
And as you noticed, after selecting Semantics.EXACTLY_ONCE
Flink is taking care of setting and controlling transactional.id
property.
However there are a couple of other properties that you might be interested mentioned here. I'm not sure how is it now, but as I checked last time default Kafka brokers/clients configuration was actually allowing for data loss to happen in some scenarios.