Search code examples
apache-flinkflink-streaming

Is there any way to zip two or more streams with order of event time in Flink?


Suppose we have a stream of data with this format:

example of input data stream:

case class InputElement(key:String,objectType:String,value:Boolean)
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
val inputStream:DataSet[InputElement] = env.fromElements(
    InputElement("k1","t1",true)
    ,InputElement("k2","t1",true)
    ,InputElement("k2","t2",true)
    ,InputElement("k1","t2",false)
    ,InputElement("k1","t2",true)
    ,InputElement("k1","t1",false)
    ,InputElement("k2","t2",false)
)

it is semantically equal to have these streams:

val inputStream_k1_t1 = env.fromElements(
    InputElement("k1","t1",true),
    InputElement("k1","t1",false)
)
val inputStream_k1_t2 = env.fromElements(
    InputElement("k1","t2",false),
    ,InputElement("k1","t2",true)
)
val inputStream_k2_t1 = env.fromElements(
    InputElement("k2","t1",true)
)
val inputStream_k2_t2 = env.fromElements(
    InputElement("k2","t2",true),
    InputElement("k2","t2",false)
)

I want to have an output type like this:

case class OutputElement(key:String,values:Map[String,Boolean])

expected output data stream for the example input data:

val expectedOutputStream = env.fromElements(
    OutputElement("k1",Map( "t1"->true ,"t2"->false)),
    OutputElement("k2",Map("t1"->true,"t2"->true)),
    OutputElement("k1",Map("t1"->false,"t2"->true)),
    OutputElement("k2",Map("t2"->false))
)

==========================================

edit 1:

after some considerations about the problem the subject of the question changed:

I want to have another input stream that shows which keys are subscribed to which object types:

case class SubscribeRule(strategy:String,patterns:Set[String])
val subscribeStream: DataStream[SubscribeRule] = env.fromElements(

      SubscribeRule("s1",Set("p1","p2")),
      SubscribeRule("s2",Set("p1","p2"))    
    )

now I want to have this output:

(the result stream does not emit any thing till all the subscribed objectType are received:

val expectedOutputStream = env.fromElements(
    OutputElement("k1",Map( "t1"->true ,"t2"->false)),
    OutputElement("k2",Map("t1"->true,"t2"->true)),
    OutputElement("k1",Map("t1"->false,"t2"->true)),
//      OutputElement("k2",Map("t2"->false)) # this element will emit when a k2-t1 input message recieved
)

Solution

  • App.scala:

    import org.apache.flink.api.common.state.MapStateDescriptor
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.datastream.BroadcastStream
    import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment}
    
    object App {
      case class updateStateResult(updatedState:Map[String,List[Boolean]],output:Map[String,Boolean])
      case class InputElement(key:String,objectType:String,passed:Boolean)
      case class SubscribeRule(strategy:String,patterns:Set[String])
      case class OutputElement(key:String,result:Map[String,Boolean])
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // checkpoint every 10 seconds
        val subscribeStream: DataStream[SubscribeRule] = env.fromElements(
    
          SubscribeRule("s1",Set("p1","p2")),
          SubscribeRule("s2",Set("p1","p2"))
    
        )
        val broadcastStateDescriptor =
          new MapStateDescriptor[String, Set[String]]("subscribes", classOf[String], classOf[Set[String]])
        val subscribeStreamBroadcast: BroadcastStream[SubscribeRule] =
          subscribeStream.broadcast(broadcastStateDescriptor)
        val inputStream = env.fromElements(
                  InputElement("s1","p1",true),
                  InputElement("s1","p2",true),
                  InputElement("s2","p1",false),
                  InputElement("s2","p2",true),
                  InputElement("s2","p2",false),
                  InputElement("s1","p1",false),
                  InputElement("s2","p1",true),
                  InputElement("s1","p2",true),
        )
        val expected = List(
            OutputElement("s1",Map("p2"->true,"p1"->true)),
            OutputElement("s2",Map("p2"->true,"p1"->false)),
          OutputElement("s2",Map("p2"->false,"p1"->true)),
            OutputElement("s1",Map("p2"->true,"p1"->false))
          )
    
    
    
            val keyedInputStream: KeyedStream[InputElement, String] = inputStream.keyBy(_.key)
            val result = keyedInputStream
              .connect(subscribeStreamBroadcast)
              .process(new ZippingFunc())
        result.print
        env.execute("test stream")
      }
    }
    

    ZippingFunc.scala

    import App.{InputElement, OutputElement, SubscribeRule, updateStateResult}
    import org.apache.flink.api.common.state.{ MapState, MapStateDescriptor, ReadOnlyBroadcastState}
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
    import org.apache.flink.util.Collector
    
    import java.util.{Map => JavaMap}
    import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsJavaMapConverter}
    
    class ZippingFunc extends KeyedBroadcastProcessFunction[String, InputElement,SubscribeRule , OutputElement] {
    
      private var localState: MapState[String,List[Boolean]] = _
    
    
      private lazy val broadcastStateDesc =
        new MapStateDescriptor[String, Set[String]]("subscribes", classOf[String], classOf[Set[String]])
      override def open(parameters: Configuration) {
        val localStateDesc: MapStateDescriptor[String,List[Boolean]] =
          new MapStateDescriptor[String, List[Boolean]]("sourceMap1", classOf[String], classOf[List[Boolean]])
        localState = getRuntimeContext.getMapState(localStateDesc)
      }
    
    
    
      def updateVar(objectType:String,value:Boolean): Option[Map[String, Boolean]] ={
        val values = localState.get(objectType)
        localState.put(objectType, value::values)
        pickOutputs(localState.entries().asScala).map((ur: updateStateResult) => {
          localState.putAll(ur.updatedState.asJava)
          ur.output
        })
      }
    
      def pickOutputs(entries: Iterable[JavaMap.Entry[String, List[Boolean]]]): Option[updateStateResult] = {
        val mapped: Iterable[Option[(String, Boolean, List[Boolean])]] = entries.map(
          (x: JavaMap.Entry[String, List[Boolean]]) => {
            val key: String = x.getKey
            val value: List[Boolean] = x.getValue
            val head: Option[Boolean] = value.headOption
            head.map(
              h => {
                (key, h, value.tail)
              }
            )
          }
        )
        sequenceOption(mapped).map((x: List[(String, Boolean, List[Boolean])]) => {
          updateStateResult(
            x.map(y => (y._1, y._3)).toMap,
            x.map(y => (y._1, y._2)).toMap
          )
        }
        )
      }
      def sequenceOption[A](l:Iterable[Option[A]]): Option[List[A]] =
      {
        l.foldLeft[Option[List[A]]](Some(List.empty[A]))(
          (acc: Option[List[A]], e: Option[A]) =>{
            for {
              xs <- acc
              x <- e
            } yield x :: xs
          }
        )
      }
      override def processElement(value: InputElement, ctx: KeyedBroadcastProcessFunction[String, InputElement, SubscribeRule, OutputElement]#ReadOnlyContext, out: Collector[OutputElement]): Unit = {
        val bs: ReadOnlyBroadcastState[String, Set[String]] = ctx.getBroadcastState(broadcastStateDesc)
        if(bs.contains(value.key)) {
          val allPatterns: Set[String] = bs.get(value.key)
          allPatterns.map((patternName: String) =>
            if (!localState.contains(patternName))
              localState.put(patternName, List.empty)
          )
          updateVar(value.objectType, value.passed)
            .map((r: Map[String, Boolean]) =>
            out.collect(OutputElement(value.key, r))
          )
        }
      }
      //    )
    
    override def processBroadcastElement(value: SubscribeRule, ctx: KeyedBroadcastProcessFunction[String, InputElement, SubscribeRule, OutputElement]#Context, out: Collector[OutputElement]): Unit = {
      val bs = ctx.getBroadcastState(broadcastStateDesc)
      bs.put(value.strategy,value.patterns)
    }
    }