Search code examples
scalaakka-stream

Akka Streams Inlets and Outlets match


Here is the simplest graph using a Partition and Merge that I could come up with, but when run it gives the following error: requirement failed: The inlets [] and outlets [] must correspond to the inlets [Merge.in0, Merge.in1] and outlets [Partition.out0, Partition.out1]

I understand that the message indicates that I either have more outputs than inputs or an unconnected flow, but I can't seem to see in this simple example where the mismatch is.

Any help is appreciated.

The graph:

    def createGraph()(implicit actorSystem: ActorSystem): Graph[ClosedShape, Future[Done]] = {
      GraphDSL.create(Sink.ignore) { implicit builder: GraphDSL.Builder[Future[Done]] => s =>
          import GraphDSL.Implicits._
          val inputs: List[Int] = List(1, 2, 3, 4)
          val source: Source[Int, NotUsed] = Source(inputs)

          val messageSplit: UniformFanOutShape[Int, Int] = builder.add(Partition[Int](2, i => i%2))

          val messageMerge: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))

          val processEven: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => {
            actorSystem.log.debug(s"even:  $rc")
            rc
          })

          val processOdd: Flow[Int, Int, NotUsed] = Flow[Int].map(rc => {
            actorSystem.log.debug(s"odd: $rc")
            rc
          })

          source ~> messageSplit.in
          messageSplit.out(0) -> processEven -> messageMerge.in(0)
          messageSplit.out(1) -> processOdd -> messageMerge.in(1)
          messageMerge.out ~> s
          ClosedShape
      }
    }

The test:

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import akka.{Done, NotUsed}
import org.scalatest.FunSpec

import scala.concurrent.Future
class RoomITSpec extends FunSpec {

  implicit val actorSystem: ActorSystem = ActorSystem("RoomITSpec")
  implicit val actorCreator: ActorMaterializer = ActorMaterializer()
  describe("graph") {
    it("should run") {
      val graph = createGraph()
      RunnableGraph.fromGraph(graph).run
    }
  }
}

Solution

  • Small syntactic mistake.

    // Notice the curly arrows
    messageSplit.out(0) ~> processEven ~> messageMerge.in(0)
    messageSplit.out(1) ~> processOdd ~> messageMerge.in(1)
    

    Instead of what you wrote:

    // Straight arrows
    messageSplit.out(0) -> processEven -> messageMerge.in(0)
    messageSplit.out(1) -> processOdd -> messageMerge.in(1)
    

    You ended up generating (and throwing away) tuples instead of adding to the graph.