Search code examples
apache-flinkflink-streamingflink-cep

Flink Checkpointing mode ExactlyOnce is not working as expected


I am newbie to flink apologize if my understanding is wrong i am building a dataflow application and the flow contains multiple data streams which check if the required fields are present in the incoming DataStream or not. My application validate the incoming data and if the data is validated successfully it should append the data to file in the given if it is already existing. I am trying to simulate if any exception happens in one DataStream other data streams should not get impacted for that i am explicitly throwing an exception in one of the flow. In the below example for simplicity i am using windows text file to append data

Note: My flow don't have states since i don't have any thing to store in state

public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));

        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within 5000 ms, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(5000);

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  
        
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts
                Time.of(10, TimeUnit.SECONDS) // delay
        ));

        DataStream<String> input1 = env.fromElements("hello");
        
        DataStream<String> input2 = env.fromElements("hello");


        DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });


        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });

       output2.addSink(new SinkFunction<String>() {
           @Override
           public void invoke(String value) throws Exception {
               try {
                File myObj = new File("C://flinkOutput//filename.txt");
                if (myObj.createNewFile()) {
                    System.out.println("File created: " + myObj.getName());
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt", true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                } else {
                    System.out.println("File already exists.");
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt", true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                }
            } catch (IOException e) {
                System.out.println("An error occurred.");
                e.printStackTrace();
            }
           }
       });

        env.execute();

    }

I have few doubts as below

  1. When i am throwing exception in output1 stream the second flow output2 is running even after encountering the exception and writing data to the file in my local but when i check the file the output as below

     hello world
     hello world
     hello world
     hello world
    
  2. As per my understanding from flink documentation if i use the checkpointing mode as EXACTLY_ONCE it should not write the data to file not more than one time as the process is already completed and written data to file. But its not happening in my case and i am not getting if i am doing anything wrong

Please help me to clear my doubts on checkpointing and how can i achieve the EXACTLY_ONCE mechanism i read about TWO_PHASE_COMMIT in flink but i didn't get any example on how to implement it.

As suggested by @Mikalai Lushchytski i implemented StreamingSinkFunction below

With StreamingSinkFunction

public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));

        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within 5000 ms, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(5000);

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  
        
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts
                Time.of(10, TimeUnit.SECONDS) // delay
        ));

        DataStream<String> input1 = env.fromElements("hello");
        
        DataStream<String> input2 = env.fromElements("hello");


        DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });
        
        
        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });
        
        
        String outputPath = "C://flinkCheckpoint";

        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1)
                                .build())
                .build();
                
        
        output2.addSink(sink);

       
       });

        env.execute();

    }

But when i check the Checkpoint folder i can see it created four part files with in progress as below

enter image description here

Is there anything i am doing because of that its creating multipart files?


Solution

  • In order to guarantee end-to-end exactly-once record delivery (in addition to exactly-once state semantics), the data sink needs to take part in the checkpointing mechanism (as well as the data source).

    If you are going to write the data to a file, then you can use a StreamingFileSink, which emits its input elements to FileSystem files within buckets. This is integrated with the checkpointing mechanism to provide exactly once semantics out-of-the box.

    If you are going to implement your own sink, then the sink function must implement the CheckpointedFunction interface and properly implement snapshotState(FunctionSnapshotContext context) method called when a snapshot for a checkpoint is requested and flushing the current application state. In addition I would recommend implementing the CheckpointListener interface to be notified once a distributed checkpoint has been completed.

    Flink already provides an abstract TwoPhaseCommitSinkFunction, which is a recommended base class for all of the SinkFunction that intend to implement exactly-once semantic. It does that by implementing two phase commit algorithm on top of the CheckpointedFunction and CheckpointListener. As an example, you can have a look at FlinkKafkaProducer.java source code.