Flink keyBy over option list

My dummy flink job

import org.apache.flink.streaming.api.scala._
import org.json4s.NoTypeHints
import org.json4s.native.Serialization

case class Label(name: String, typ: String)
case class MyData(id: String, labels: Option[List[Label]] )

object WindowWordCount {
  implicit val formats = Serialization.formats(NoTypeHints)

  def main(args: Array[String]) {

  val env = StreamExecutionEnvironment.getExecutionEnvironment

  val packetSource = env
      .socketTextStream("localhost", 7777)
      .map(json => read[MyData](json))

  env.execute("Window Stream WordCount")

So, each MyData object have unique id and can have multiple labels. What im going to do is do .keyBy by label.

Incoming data example (serialized to MyData)

  "id": "1",
  "labels": [
      "name": "unolabelo",
      "typ": "two"
      "name": "twunolabelo",
      "typ": "two"

If a single MyData element comes with 3 different labels i need to emit 3 MyData elements with a unique label and then i can do .keyBy(_.label) .

What is the best way to do this?


  • Solved by applying .flatMap function

    val packetSource = env
      .socketTextStream("localhost", 7777)
      .map(json => read[MyData](json))
        new FlatMapFunction[MyData,MyData] {
          override def flatMap(value: MyData, out: Collector[MyData]): Unit = {
            value.labels match {
              case Some(labels) =>
                for (label <- labels) {
                  out.collect(value.copy(labels = Some(List(label))))
              case None =>