Search code examples
apache-kafkaakkaapache-kafka-streamsakka-stream

Does Akka Stream Implement the Join Semantic as Kafka Streams Does?


I am quite new to Akka Streams, whereas I have some experience with Kafka Streams. One thing it seems lacking in Akka Streams is the possibility to join together two different streams.

Kafka Streams allows joining information coming from two different streams (or tables) using the messages' keys.

Is there something similar in Akka Streams?


Solution

  • The short answer is unfortunately no. I would argue that Akka-streams is more low level than Kafka-Stream, Spark Streaming, or Flink. However, you have more control over what you are doing. Basically, it means that you can build your join operator. Check this discussion at lightbend.

    Basically, you have to get data from 2 Sources, Merge them and send to a window based on time or number of tuples, compute the join, and emit the data to the Sink. I have done this PoC (which is still unfinished) but I follow the operators that I said to you here, and it is compiling and working. Basically, I still have to join the data inside the window. Currently, I am just emitting them in a mini-batch.

    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.{Attributes, ClosedShape, FlowShape, Inlet, Outlet}
    import akka.stream.scaladsl.{Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}
    import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic}
    
    import scala.collection.mutable
    import scala.concurrent.duration._
    
    object StreamOpenGraphJoin {
      def main(args: Array[String]): Unit = {
        implicit val system = ActorSystem("StreamOpenGraphJoin")
        val incrementSource: Source[Int, NotUsed] = Source(1 to 10).throttle(1, 1 second)
        val decrementSource: Source[Int, NotUsed] = Source(10 to 20).throttle(1, 1 second)
    
        def tokenizerSource(key: Int) = {
          Flow[Int].map { value =>
            (key, value)
          }
        }
    
        // Step 1 - setting up the fundamental for a stream graph
        val switchJoinStrategies = RunnableGraph.fromGraph(
          GraphDSL.create() { implicit builder =>
            import GraphDSL.Implicits._
    
            // Step 2 - add partition and merge strategy
            val tokenizerShape00 = builder.add(tokenizerSource(0))
            val tokenizerShape01 = builder.add(tokenizerSource(1))
    
            val mergeTupleShape = builder.add(Merge[(Int, Int)](2))
            val batchFlow = Flow.fromGraph(new BatchTimerFlow[(Int, Int)](5 seconds))
            val sinkShape = builder.add(Sink.foreach[(Int, Int)](x => println(s" > sink: $x")))
    
            // Step 3 - tying up the components
            incrementSource ~> tokenizerShape00 ~> mergeTupleShape.in(0)
            decrementSource ~> tokenizerShape01 ~> mergeTupleShape.in(1)
            mergeTupleShape.out ~> batchFlow ~> sinkShape
    
            // Step 4 - return the shape
            ClosedShape
          }
        )
        // run the graph and materialize it
        val graph = switchJoinStrategies.run()
      }
    
      // step 0: define the shape
      class BatchTimerFlow[T](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[T, T]] {
        // step 1: define the ports and the component-specific members
        val in = Inlet[T]("BatchTimerFlow.in")
        val out = Outlet[T]("BatchTimerFlow.out")
    
        // step 3: create the logic
        override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
          // mutable state
          val batch = new mutable.Queue[T]
          var open = false
    
          // step 4: define mutable state implement my logic here
          setHandler(in, new InHandler {
            override def onPush(): Unit = {
              try {
                val nextElement = grab(in)
                batch.enqueue(nextElement)
                Thread.sleep(50) // simulate an expensive computation
                if (open) pull(in) // send demand upstream signal, asking for another element
                else {
                  // forward the element to the downstream operator
                  emitMultiple(out, batch.dequeueAll(_ => true).to[collection.immutable.Iterable])
                  open = true
                  scheduleOnce(None, silencePeriod)
                }
              } catch {
                case e: Throwable => failStage(e)
              }
            }
          })
          setHandler(out, new OutHandler {
            override def onPull(): Unit = {
              pull(in)
            }
          })
          override protected def onTimer(timerKey: Any): Unit = {
            open = false
          }
        }
        // step 2: construct a new shape
        override def shape: FlowShape[T, T] = FlowShape[T, T](in, out)
      }
    }