Search code examples
hdfsapache-flink

why are there many in-progress files after restarting flink program?


I use flink to consume kafka and save them to hdfs in parquet format. Now I find that so many inprogress files in my target directory , which will not close as a file in target dir when I restart my flink program.

My envs:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(60000)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getConfig.registerTypeWithKryoSerializer(classOf[MyMessage],classOf[ProtobufSerializer])


//sinks

    val bucketAssigner = new DateTimeBucketAssigner[myCounter]("yyyy-MM-dd", ZoneId.of("Asia/Shanghai"))
    val streamingFileSink = StreamingFileSink.
      forBulkFormat(path, ParquetAvroWriters.forSpecificRecord(classOf[myCounter]))
      .withBucketCheckInterval(60000)
      .withBucketAssigner(bucketAssigner).build

-rw-r--r--   3 Administrator hdfs       1629 2019-08-05 17:06 /user/data/2019-08-05/.part-2-0.inprogress.722265d7-1082-4c84-b70d-da2a08092f5d
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:07 /user/data/2019-08-05/.part-2-1.inprogress.ac0d8b56-b8f0-4893-9e55-5374b69f16cc
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:08 /user/data/2019-08-05/.part-2-2.inprogress.a427c2e2-d689-42b8-aa3d-77873c5654f2
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:09 /user/data/2019-08-05/.part-2-3.inprogress.b5c746e3-354d-4ab3-b1a4-8c6bd88ae430
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:59 /user/data/2019-08-05/.part-2-3.inprogress.e286d995-3fa7-4696-b51a-27378412a35c
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:00 /user/data/2019-08-05/.part-2-4.inprogress.bcde4f30-2f78-4f54-92ad-9bc54ac57c5c
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:10 /user/data/2019-08-05/.part-2-4.inprogress.dbce8a00-6514-43dc-8b31-36c5a8665d37
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 17:10 /user/data/2019-08-05/.part-2-5.inprogress.34e53418-f5af-4279-87ef-6a27549d90fe
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 17:01 /user/data/2019-08-05/.part-2-5.inprogress.936cdb63-4fe2-41bf-b839-2861030c5516
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 16:55 /user/data/2019-08-05/.part-2-6.inprogress.7a7099a6-9dcd-450b-af2c-8a676276ef0a
-rw-r--r--   3 Administrator hdfs          0 2019-08-05 17:01 /user/data/2019-08-05/.part-2-6.inprogress.b57f548f-45fc-497c-9807-ef18dba3d11d
-rw-r--r--   3 Administrator hdfs       1574 2019-08-05 16:56 /user/data/2019-08-05/part-2-0
-rw-r--r--   3 Administrator hdfs       1868 2019-08-05 16:57 /user/data/2019-08-05/part-2-1
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:58 /user/data/2019-08-05/part-2-2
-rw-r--r--   3 Administrator hdfs       1661 2019-08-05 16:53 /user/data/2019-08-05/part-2-3
-rw-r--r--   3 Administrator hdfs       1891 2019-08-05 16:54 /user/data/2019-08-05/part-2-4

I think the reason is that the in-progress file is not close when I restart the program, I'm confused that why the files will not close after restart, even the new files are became in-progress. Could someone explalin?


Solution

  • For short, for Exactly-Once semantic.

    Pls read this post from Flink offical blog first.

    Then let me make some attempts to explain it clearly.

    1. BucketingSink write all records to temp file, by default with suffix in-progress.

    2. When the time to checkpointing on this sink comes, Flink will save the name fo the in-progress files to the checkpoint;

    3. When It's time for commit, Flink will rename in-progress files to final names, on your examples, they are part-x-x files.

    And when you restart Flink application, Flink job will restart from last savepoint(If you set the parameters), and many in-progress files which were not ready for commit, will be abandoned, and never be read(starts with dot will not be list by HDFS) by the users.

    Of course, I ignored many details, for examples, a file will be rename to .pending files when it's volume exceed the configuration, ect.