Search code examples
apache-flinkflink-streaming

how to achieve exactly once semantics in apache kafka connector


I am using flink version 1.8.0 . My application reads data from kafka -> transform -> publish to Kafka. To avoid any duplicates during restart, i want to use kafka producer with Exactly once semantics , read about it here :

https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/connectors/kafka.html#kafka-011-and-newer

My kafka version is 1.1 .

        return new FlinkKafkaProducer<String>( topic,  new KeyedSerializationSchema<String>() {


            public byte[] serializeKey(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public byte[] serializeValue(String element) {
                // TODO Auto-generated method stub
                return element.getBytes();
            }


            public String getTargetTopic(String element) {
                // TODO Auto-generated method stub
                return topic;
            }
        },prop, opt, FlinkKafkaProducer.Semantic.EXACTLY_ONCE, 1);

Checkpoint Code :

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    checkpointConfig.setCheckpointTimeout(15 * 1000 );
    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.enableCheckpointing(5000 );

If I add exactly once sematics in kafka producer , my flink consumer is not reading any new data.

Can any one please share any sample code/application with Exactly once Semantics ?

Please find complete code here :

https://github.com/sris2/sample_flink_exactly_once

Thanks


Solution

  • Can any one please share any sample code/application with Exactly once Semantics ?

    An exactly once example is hidden in an end-to-end test in flink. Since it uses some convenience functions, it may be hard to follow without checking out the whole repo.

    If I add exactly once sematics in kafka producer , my flink consumer is not reading any new data. [...] Please find complete code here :

    https://github.com/sris2/sample_flink_exactly_once

    I checked out your code and found the issue (had to fix the whole setup/code to actually get it running). The sink can actually not configure the transactions correctly. As written in the Flink Kafka connector documentation, you need to adjust the transaction.timeout.ms either in your Kafka broker up to 1 hour or in your application down to 15 min:

        prop.setProperty("transaction.timeout.ms", "900000");
    

    The respective excerpt is:

    Kafka brokers by default have transaction.max.timeout.ms set to 15 minutes. This property will not allow to set transaction timeouts for the producers larger than it’s value. FlinkKafkaProducer011 by default sets the transaction.timeout.ms property in producer config to 1 hour, thus transaction.max.timeout.ms should be increased before using the Semantic.EXACTLY_ONCE mode.