I am using the following simple code to illustrate the behavior of file system connector. I have two observations that I want to ask and confirm.
If I didn't enable checkpointing, then all of the genereated part-XXX files always contain inprogress
in the file name, Does it mean these files are not committed? Also, does it mean that if I want to use file system connector sink, then I always need to enable checkpointing
so that the generated files can be committed and the downstream(like hive or flink) can discover and read these files?
When does the inprogress
files are moved to normal in the partition? Does it happen when the new partition is created, and when checkpoint starts to run,then makes the files in the previous partition from inprogress to be formal ? If so, then there may be a deplay(checkpoint interval) for the partition to be visible.
I have set the rolling interval to be 20 seconds in the code, but when I look at the generated part-XXX files, the differnce of the creation time for the consequent files is 25 seconds. I have thought it should be 20 seconds
eg,
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-10 2021-01-03 12:39:04
part-90e63e04-466f-45ce-94d4-9781065a8a8a-0-11 2021-01-03 12:39:29
The code is:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(10*1000)
env.setStateBackend(new FsStateBackend("file:///d:/flink-checkpoints"))
val ds: DataStream[MyEvent] = env.addSource(new InfiniteEventSource(emitInterval = 5 * 1000))
val tenv = StreamTableEnvironment.create(env)
tenv.createTemporaryView("sourceTable", ds)
ds.print()
val ddl =
s"""
create table sinkTable(
id string,
p_day STRING,
p_hour STRING,
p_min STRING
) partitioned by(p_day, p_hour, p_min) with (
'connector' = 'filesystem',
'path' = 'D:/csv-${System.currentTimeMillis()}',
'format' = 'csv',
'sink.rolling-policy.check-interval' = '5 s',
'sink.rolling-policy.rollover-interval' = '20 s',
'sink.partition-commit.trigger'='process-time',
'sink.partition-commit.policy.kind'='success-file',
'sink.partition-commit.delay' = '0 s'
)
""".stripMargin(' ')
tenv.executeSql(ddl)
tenv.executeSql(
"""
insert into sinkTable
select id, date_format(occurrenceTime,'yyyy-MM-dd'), date_format(occurrenceTime, 'HH'), date_format(occurrenceTime, 'mm') from sourceTable
""".stripMargin(' '))
env.execute()
}
Points 1 is covered in the StreamingFileSink docs:
IMPORTANT: Checkpointing needs to be enabled when using the StreamingFileSink. Part files can only be finalized on successful checkpoints. If checkpointing is disabled, part files will forever stay in the
in-progress
or thepending
state, and cannot be safely read by downstream systems.
For point 2, the part file lifecycle is documented here, which explains that in-progress
files transition to pending
based on the rolling policy, and only become finished
when a checkpoint is completed. Thus, depending on the rolling policy and the checkpoint interval, some files could be pending
for quite some time.
For point 3, with a rollover-interval
of 20 seconds, and a check-interval
of 5 seconds, the rollover will occur after somewhere between 20 and 25 seconds. See the Rolling Policy docs for the explanation of check-interval
:
The interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.