Search code examples
scalaapache-flink

How to convert Flink DataSet tuple to one column


I've a graph data like

1 2
1 4
4 1
4 2
4 3
3 2
2 3

But I couldn't find a way to convert it a one column dataset like

1
2
1
4
4
1
...

here is my code, I used scala ListBuffer, but couldn't find a way doing it in Flink DataSet

    val params: ParameterTool = ParameterTool.fromArgs(args)
    val env = ExecutionEnvironment.getExecutionEnvironment

    env.getConfig.setGlobalJobParameters(params)
    val text = env.readTextFile(params.get("input"))
    val tupleText = text.map { line =>
      val arr = line.split(" ")
      (arr(0), arr(1))
    }

    var x: Seq[(String, String)] = tupleText.collect()
    var tempList = new ListBuffer[String]
    x.foreach(line => {
      tempList += line._1
      tempList += line._2
    })

    tempList.foreach(println)

Solution

  • You can do that with flatMap:

    // get some input
    val input: DataSet[(Int, Int)] = env.fromElements((1, 2), (2, 3), (3, 4))
    
    // emit every tuple element as own record
    val output: DataSet[Int] = input.flatMap( (t, out) => {
      out.collect(t._1)
      out.collect(t._2)
    })
    
    // print result
    output.print()