Search code examples
scalaakkaakka-stream

Akka combining Sinks without access to Flows


I am using an API that accepts a single AKKA Sink and fills it with data:

def fillSink(sink:Sink[String, _])

Is there a way, without delving into the depths of akka, to handle the output with two sinks instead of one?

For example

val mySink1:Sink = ...
val mySink2:Sink = ...
//something
fillSink( bothSinks )

If I had access to the Flow used by the fillSink method I could use flow.alsoTo(mySink1).to(mySink2) but the flow is not exposed.

The only workaround at the moment is to pass a single Sink which handles the strings and passes it on to two StringBuilders to replace mySink1/mySink2, but it feels like that defeats the point of AKKA. Without spending a couple days learning AKKA, I can't tell if there is a way to split output from sinks.

Thanks!


Solution

  • The combine Sink operator, which combines two or more Sinks using a provided Int => Graph[UniformFanOutShape[T, U], NotUsed]] function, might be what you're seeking:

    def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int => Graph[UniformFanOutShape[T, U], NotUsed]): Sink[T, NotUsed]
    

    A trivialized example:

    val doubleSink = Sink.foreach[Int](i => println(s"Doubler: ${i*2}"))
    val tripleSink = Sink.foreach[Int](i => println(s" Triper: ${i*3}"))
    
    val combinedSink = Sink.combine(doubleSink, tripleSink)(Broadcast[Int](_))
    
    Source(List(1, 2, 3)).runWith(combinedSink)
    
    // Doubler: 2
    //  Triper: 3
    // Doubler: 4
    //  Triper: 6
    // Doubler: 6
    //  Triper: 9