My dummy flink job
import org.apache.flink.streaming.api.scala._
import org.json4s.NoTypeHints
import org.json4s.native.Serialization
import org.json4s.native.Serialization.read
case class Label(name: String, typ: String)
case class MyData(id: String, labels: Option[List[Label]] )
object WindowWordCount {
implicit val formats = Serialization.formats(NoTypeHints)
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.setParallelism(1)
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
env.execute("Window Stream WordCount")
}
}
So, each MyData
object have unique id
and can have multiple labels
.
What im going to do is do .keyBy
by label.
Incoming data example (serialized to MyData
)
{
"id": "1",
"labels": [
{
"name": "unolabelo",
"typ": "two"
},
{
"name": "twunolabelo",
"typ": "two"
}
]
}
If a single MyData
element comes with 3 different labels i need to emit 3 MyData
elements with a unique label and then i can do .keyBy(_.label)
.
What is the best way to do this?
Solved by applying .flatMap
function
val packetSource = env
.socketTextStream("localhost", 7777)
.map(json => read[MyData](json))
.flatMap(
new FlatMapFunction[MyData,MyData] {
override def flatMap(value: MyData, out: Collector[MyData]): Unit = {
value.labels match {
case Some(labels) =>
for (label <- labels) {
out.collect(value.copy(labels = Some(List(label))))
}
case None =>
}
}
}
)
.keyBy(_.labels.get.head.name)