Search code examples
apache-flink

Apache Flink: How to create two datasets from one dataset using Flink DataSet API


I'm writing an application using DataSet API of Flink 0.10.1. Can I get multiple collectors using a single operator in Flink?

What I want to do is something like below:

val lines = env.readTextFile(...)
val (out_small, out_large) = lines **someOp** {
  (iterator, collector1, collector2) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector1.collect(elem1)
      collector2.collect(elem2)
    }
  } 
} 

Currently I'm calling mapPartition twice to make two datasets from one source dataset.

val lines = env.readTextFile(...)
val out_small = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem1)
    }
  } 
}
val out_large = lines mapPartition {
  (iterator, collector) => {
    for (line <- iterator) {
      val (elem1, elem2) = doParsing(line)
      collector.collect(elem2)
    }
  } 
}

As doParsing function is quite expensive, I want to call it just once per each line.

p.s. I would be very appreciated if you can let me know other approaches to do this kind of stuff in a simpler way.


Solution

  • Flink does not support multiple collectors. However, you can change the output of your parsing step by adding an additional field that indicates the output type:

    val lines = env.readTextFile(...)
    val intermediate = lines **someOp** {
      (iterator, collector) => {
        for (line <- iterator) {
          val (elem1, elem2) = doParsing(line)
          collector.collect(0, elem1) // 0 indicates small
          collector.collect(1, elem2) // 1 indicates large
        }
      } 
    } 
    

    Next you consume the output intermediate twice and filter each for the first attribute. The first filter filters for 0 the second filter for 1 (you an also add a projection to get rid of the first attribute).

                   +---> filter("0") --->
                   | 
    intermediate --+
                   | 
                   +---> filter("1") --->