Search code examples
scalaakkaakka-stream

java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [] must correspond to the inlets [Map.in] and outlets []


I am getting the dreadful java.lang.IllegalArgumentException: requirement failed: The inlets [] and outlets [] must correspond to the inlets [Map.in] and outlets [] error message and previous SO answers are not helping me.

This is my graph

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
  s =>
     import GraphDSL.Implicits._
     val broadcast = b.add(Broadcast[Foo](2))
     val merge = b.add(Merge[Foo](2))
     source ~> func1() ~> broadcast.in
     broadcast.out(0).filter(_.country == "US") ~> usSpecificFilter ~> merge.in(0)
     broadcast.out(1).filter(_.country != "US") ~> internationalFilter ~> merge.in(1)
     merge.out ~> sink
     ClosedShape
})

def func1() : Flow[String, Foo, NotUsed] = {...}

From previous threads I could gather that if you have "unused flows" (meaning something which was added to the builder but was not used in the graph) then this error will come.

But as you can see above that I don't have unused flow.

Edit::

val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){ implicit b =>
  s =>
     import GraphDSL.Implicits._
     val broadcast = b.add(Broadcast[Foo](2))
     val merge = b.add(Merge[Foo](2))
     val s = b.add(sink)
     source ~> func1() ~> broadcast.in
     broadcast.out(0).filter(_.country == "US") ~> usSpecificFilter ~> merge.in(0)
     broadcast.out(1).filter(_.country != "US") ~> internationalFilter ~> merge.in(1)
     merge.out ~> s.in
     ClosedShape
})

Still does not work!


Solution

  • In your graph, change merge.out ~> sink to merge.out ~> s.in:

    val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) { implicit b => s =>
      import GraphDSL.Implicits._
    
      val broadcast = b.add(Broadcast[Foo](2))
      val merge = b.add(Merge[Foo](2))
      source ~> func1() ~> broadcast.in
      broadcast.out(0).filter(_.country == "US") ~> usSpecificFilter ~> merge.in(0)
      broadcast.out(1).filter(_.country != "US") ~> internationalFilter ~> merge.in(1)
      merge.out ~> s.in // <--- changed sink to s.in
      ClosedShape
    })