I'm writing an application using DataSet API of Flink 0.10.1. Can I get multiple collectors using a single operator in Flink?
What I want to do is something like below:
val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
(iterator, collector1, collector2) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector1.collect(elem1)
collector2.collect(elem2)
}
}
}
Currently I'm calling mapPartition twice to make two datasets from one source dataset.
val lines = env.readTextFile(...)
val out_small = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem1)
}
}
}
val out_large = lines mapPartition {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(elem2)
}
}
}
As doParsing function is quite expensive, I want to call it just once per each line.
p.s. I would be very appreciated if you can let me know other approaches to do this kind of stuff in a simpler way.
Flink does not support multiple collectors. However, you can change the output of your parsing step by adding an additional field that indicates the output type:
val lines = env.readTextFile(...)
val intermediate = lines **someOp** {
(iterator, collector) => {
for (line <- iterator) {
val (elem1, elem2) = doParsing(line)
collector.collect(0, elem1) // 0 indicates small
collector.collect(1, elem2) // 1 indicates large
}
}
}
Next you consume the output intermediate
twice and filter each for the first attribute. The first filter filters for 0
the second filter for 1
(you an also add a projection to get rid of the first attribute).
+---> filter("0") --->
|
intermediate --+
|
+---> filter("1") --->