Search code examples
scalaakkaakka-stream

I'm getting `already connected` when using Akka stream partitions


I'm trying the Akka Stream API, but I don't know why it throws a java.lang.IllegalArgumentException.

    val graph = RunnableGraph.fromGraph(
      GraphDSL.create(source, sink)
      ((source, sink) => Seq(source, sink)) {
        implicit b => (source, sink) =>
          Import akka.stream.scaladsl.GraphDSL.Implicits._
          val partition = b.add(Partition[(KinesisRecord)](2, flow => {
            1
          }))

          source ~> partition.in

          partition.out(0) ~> sink
          partition.out(1) ~> sink

          ClosedShape
      })

Here is the current code. The error is as follows

[info] - should consume *** FAILED ***
[info] java.lang.IllegalArgumentException: [Map.in] is already connected
[info] at akka.stream.scaladsl.GraphDSL$Builder.addEdge(Graph.scala:1567)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater(Graph.scala:1730)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$CombinerBase.$tilde$greater$(Graph.scala:1729)
[info] at akka.stream.scaladsl.GraphDSL$Implicits$PortOpsImpl.$tilde$greater(Graph.scala:1784)
[info] at akka.stream.scaladsl.GraphApply.create(GraphApply.scala:46)
[info] at akka.stream.scaladsl.GraphApply.create$(GraphApply.scala:41)
[info] at akka.stream.scaladsl.GraphDSL$.create(Graph.scala:1529)

I am using kinesisRecord as the target of source. However, in this code, if I change the outputPorts to 1 and remove

partition.out(1) ~> sink

this line, it works.

I don't know if I'm missing something or if it's just a bug.


Solution

  • I reproduced your error on my environment and implemented with the solution. But I am using an Int source, not a Kinesis source. You can just replace it to your datatype and it may work.

    import akka.actor.ActorSystem
    import akka.stream.ClosedShape
    import akka.stream.scaladsl.{GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
    
    import scala.concurrent.duration._
    
    object AkkaStreamWithKinesis extends App {
      implicit val system = ActorSystem("AkkaStreamWithKinesisSystem")
    
      val source = Source(1 to 1000).throttle(5, 1 second)
      val sink = Sink.foreach[Int](println(_))
    
      val graph = RunnableGraph.fromGraph(
        GraphDSL.create(source, sink)
        ((source, sink) => Seq(source, sink)) {
          implicit builder =>
            (source, sink) =>
              import akka.stream.scaladsl.GraphDSL.Implicits._
              val partition = builder.add(Partition[Int](2, flow => {
                1
              }))
              val merge = builder.add(Merge[Int](2))
    
              source ~> partition.in
              partition.out(0) ~> merge.in(0)
              partition.out(1) ~> merge.in(1)
              merge.out ~> sink
    
              ClosedShape
        }).run()
    }
    

    output:

    09:15:36.443 [AkkaStreamWithKinesisSystem-akka.actor.default-dispatcher-6] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12