Search code examples
scalaakkaakka-stream

Consume a source with two sinks and get the result of one sink


I'd like to consume a Source with two different sinks.

Simplified example:

val source = Source(1 to 20)

val addSink = Sink.fold[Int, Int](0)(_ + _)
val subtractSink = Sink.fold[Int, Int](0)(_ - _)

val graph = GraphDSL.create() { implicit builder =>
  import GraphDSL.Implicits._

  val bcast = builder.add(Broadcast[Int](2))

  source ~> bcast.in

  bcast.out(0) ~> addSink
  bcast.out(1) ~> subtrackSink

  ClosedShape
}

RunnableGraph.fromGraph(graph).run()

val result: Future[Int] = ???

I need to be able to retrieve the result of addSink. RunnableGraph.fromGraph(graph).run() gives me NotUsed, but I'd like to get an Int (the result of the first fold Sink). Is it possible?


Solution

  • Pass in both sinks to the graph builder's create method, which gives you access to their respective materialized values:

    val graph = GraphDSL.create(addSink, subtractSink)((_, _)) { implicit builder =>
      (aSink, sSink) =>
      import GraphDSL.Implicits._
    
      val bcast = builder.add(Broadcast[Int](2))
    
      source ~> bcast.in
      bcast.out(0) ~> aSink
      bcast.out(1) ~> sSink
      ClosedShape
    }
    
    val (addResult, subtractResult): (Future[Int], Future[Int]) =
      RunnableGraph.fromGraph(graph).run() 
    

    Alternatively, you can forgo the graph DSL and use alsoToMat:

    val result: Future[Int] =
      Source(1 to 20)
        .alsoToMat(addSink)(Keep.right)
        .toMat(subtractSink)(Keep.left)
        .run()
    

    The above gives you the materialized value of addSink. If you want to get the materialized value of both addSink and subtractSink, use Keep.both:

    val (addResult, subtractResult): (Future[Int], Future[Int]) =
      Source(1 to 20)
        .alsoToMat(addSink)(Keep.right)
        .toMat(subtractSink)(Keep.both) // <--
        .run()