Search code examples
apache-flink

Flink keyBy over option list


My dummy flink job

import org.apache.flink.streaming.api.scala._
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read

case class Label(name: String, typ: String)
case class MyData(id: String, labels: Option[List[Label]] )

object WindowWordCount {
  implicit val formats = Serialization.formats(NoTypeHints)

  def main(args: Array[String]) {

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.getConfig.setParallelism(1)

  val packetSource = env
      .socketTextStream("localhost", 7777)
      .map(json => read[MyData](json))

  env.execute("Window Stream WordCount")
  }
}

So, each MyData object have unique id and can have multiple labels. What im going to do is do .keyBy by label.

Incoming data example (serialized to MyData)

{
  "id": "1",
  "labels": [
    {
      "name": "unolabelo",
      "typ": "two"
    },
    {
      "name": "twunolabelo",
      "typ": "two"
    }
  ]
}

If a single MyData element comes with 3 different labels i need to emit 3 MyData elements with a unique label and then i can do .keyBy(_.label) .

What is the best way to do this?


Solution

  • Solved by applying .flatMap function

    val packetSource = env
      .socketTextStream("localhost", 7777)
      .map(json => read[MyData](json))
      .flatMap(
        new FlatMapFunction[MyData,MyData] {
          override def flatMap(value: MyData, out: Collector[MyData]): Unit = {
            value.labels match {
              case Some(labels) =>
                for (label <- labels) {
                  out.collect(value.copy(labels = Some(List(label))))
                }
              case None =>
            }
          }
        }
      )
      .keyBy(_.labels.get.head.name)