Search code examples
apache-flinkflink-sql

How does the file system connector sink work


I am using the following simple code to illustrate the behavior of file system connector. I have two observations that I want to ask and confirm.

  1. If I didn't enable checkpointing, then all of the genereated part-XXX files always contain inprogress in the file name, Does it mean these files are not committed? Also, does it mean that if I want to use file system connector sink, then I always need to enable checkpointing so that the generated files can be committed and the downstream(like hive or flink) can discover and read these files?

  2. When does the inprogress files are moved to normal in the partition? Does it happen when the new partition is created, and when checkpoint starts to run,then makes the files in the previous partition from inprogress to be formal ? If so, then there may be a deplay(checkpoint interval) for the partition to be visible.

  3. I have set the rolling interval to be 20 seconds in the code, but when I look at the generated part-XXX files, the differnce of the creation time for the consequent files is 25 seconds. I have thought it should be 20 seconds

eg,

part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-10 2021-0‎1-0‎3 ‏‎12:39:04  
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-11 2021-0‎1-0‎3 ‏‎12:39:29

The code is:

 val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(10*1000)
    env.setStateBackend(new FsStateBackend("file:///d:/flink-checkpoints"))
    val ds: DataStream[MyEvent] = env.addSource(new InfiniteEventSource(emitInterval = 5 * 1000))
    val tenv = StreamTableEnvironment.create(env)
    tenv.createTemporaryView("sourceTable", ds)

    ds.print()

    val ddl =
      s"""
      create table sinkTable(
      id string,
      p_day STRING,
      p_hour STRING,
      p_min STRING

      ) partitioned by(p_day, p_hour, p_min) with (
        'connector' = 'filesystem',
        'path' = 'D:/csv-${System.currentTimeMillis()}',
        'format' = 'csv',
        'sink.rolling-policy.check-interval' = '5 s',
        'sink.rolling-policy.rollover-interval' = '20 s',
        'sink.partition-commit.trigger'='process-time',
         'sink.partition-commit.policy.kind'='success-file',
        'sink.partition-commit.delay' = '0 s'
      )
      """.stripMargin(' ')

    tenv.executeSql(ddl)

    tenv.executeSql(
      """
     insert into sinkTable
      select id, date_format(occurrenceTime,'yyyy-MM-dd'), date_format(occurrenceTime, 'HH'), date_format(occurrenceTime, 'mm')  from sourceTable

     """.stripMargin(' '))

    env.execute()
  }

Solution

  • Points 1 is covered in the StreamingFileSink docs:

    IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints. If checkpointing is disabled, part files will forever stay in the in-progress or the pending state, and cannot be safely read by downstream systems.

    For point 2, the part file lifecycle is documented here, which explains that in-progress files transition to pending based on the rolling policy, and only become finished when a checkpoint is completed. Thus, depending on the rolling policy and the checkpoint interval, some files could be pending for quite some time.

    For point 3, with a rollover-interval of 20 seconds, and a check-interval of 5 seconds, the rollover will occur after somewhere between 20 and 25 seconds. See the Rolling Policy docs for the explanation of check-interval:

    The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.