Search code examples
apache-flink

Apache flink - move files to a different folder after reading


I am reading csv files from a directory and doing some processing. Right now flink just picks any new file that comes in that directory and processes it. This is working fine for me.

I am stuck in 2 issues:

  1. I want to log the file names that flink has finished processing.
  2. I want to move processed files to a different folder as soon as flink completes the processing.

My code snippet is :

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

org.apache.flink.core.fs.Path filePath = new org.apache.flink.core.fs.Path(feedFileFolderPath);

RowCsvInputFormat format = new RowCsvInputFormat(filePath, FetchTypeInformation.getTypeInformation());

DataStream<Row> inputStream = env.readFile(format, feedFileFolderPath, FileProcessingMode.PROCESS_CONTINUOUSLY,
                parseInt(folderLookupTime));

Solution

  • This topic has come up a couple of times on the flink mailing list -- see the discussion here and here -- but the short summary is that there's not yet an easy way to do this from within Flink.

    What seems to be commonly done is to use a cron job to periodically move older files out of the directory being monitored, with the assumption that they've been processed. If you want to be more careful than that, then you'll have to implement your own mechanism for tracking the progress of the job doing the processing. The email threads referred to above include some ideas of how to do that.