We have a list of filepaths from a DB table with a timestamp on when it is created. Trying to figureout how we can use the filepath list from db to forward only those files from nfs to kafka sink.
Right now I am using customized version of ContinuousFileMonitoringFunction with root of folder that will contain all the files that DB will show. This operation is very slow as going thorough the folder to gather information on updated files as the folder is too big with few TB's of data.
Table orders = tableEnv.from("Customers");
Table result = orders.where($("b").isEqual("****"));
DataSet<String> ds = result.toDataSet();
ds has all the file paths that should be sent to kafka.
The following is the idea that I am planning to implement. But is there a better efficient approach considering flink parallelism, flink library support etc ?
public class FileContentMap extends RichFlatMapFunction<String, String> {
@Override
public void flatMap(String input, Collector<String> out) throws Exception {
// get the file path
String filePath = input;
String fileContent = readFile(input);
out.collect(fileCOntent);
}
@Override
public void open(Configuration config) {
}
}
DataSet<String> contectDataSet = ds.map(new FileCOntentMap());
contectDataSet.addSink(kafkaProducer);
Your approach seems fine to me. Perhaps more efficient would be to create a RichParallelSourceFunction
, where in the open()
method you make the call to the DB to get the list of files that have been updated, and you build an in-memory list of the files that that particular source sub-task (something like filePath.hashCode() % numSubTasks == mySubTask
) should emit to be processed by your FileContentMap
.