Search code examples
apache-flinkflink-streaming

Process two data sources successively in Apache Flink


I'd like to batch process two files with Apache Flink, one after the other.

For a concrete example: suppose I want to assign an index to each line, such that lines from the second file follow the first. Instead of doing so, the following code interleaves lines in the two files:

val env = ExecutionEnvironment.getExecutionEnvironment

val text1 = env.readTextFile("/path/to/file1")
val text2 = env.readTextFile("/path/to/file2")

val union = text1.union(text2).flatMap { ... }

I want to make sure all of text1 is sent through the flatMap operator first, and then all of text2. What is the recommended way to do so?

Thanks in advance for the help.


Solution

  • DataSet.union() does not provide any order guarantees across inputs. Records from the same input partition will remain in order but will be merged with records from the other input.

    But there is a more fundamental problem. Flink is a parallel data processor. When processing data in parallel, a global order cannot be preserved. For example, when Flink reads files in parallel, it tries to split these files and process each split independently. The splits are handed out without any particular order. Hence, the records of a single file are already shuffled. You would need to set the parallelism of the whole job to 1 and implement a custom InputFormat to make this work.

    You can make that work, but it won't in parallel and you need to tweak many things. I don't think that Flink is the best tool for such a task. Have you considered using simple unix commandline tools to concatenate your files?