Search code examples
scalaakka-stream

Akka Streams - fan out with filter


I'm attempting to apply a separate a filter on 2 output streams as a result of a fanout. I emit 3 objects of Type Test defined as :

case class Test(test: String, tester: Double)

Running the below src does not produce any results :

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source, Zip}

object TestFanOut extends App {
  implicit val actorSystem = ActorSystem()

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
  val output = Sink.foreach[(Test , Test)](println)
  val input = Source.repeat(Test("1" , 23)).take(3)

  case class Test(test: String, tester: Double)

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Test](2))
      val zip = builder.add(Zip[Test , Test])

      input ~> broadcast

      broadcast.out(0) ~> filter1 ~> zip.in0
      broadcast.out(1) ~> filter2 ~> zip.in1

      zip.out ~> output

      ClosedShape

    }
  )

  graph.run()
}

However, changing the filters to :

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))

produces :

(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))

It seems my filtering logic is not correctly applied ?

When I define the filters as :

  val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
  val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))

I expect output of :

(Test(1,23.0),Test(1,23.0))
(Test(1,23.0))

As this this matches one of the fan out streams.


Solution

  • Zip requires output from both Flows. In this case, it won't be possible because of the filter.

    Instead, you could use Merge,

    val output = Sink.foreach[Test](println)
    
    val graph = RunnableGraph.fromGraph(
      GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._
    
        val broadcast = builder.add(Broadcast[Test](2))
        val merge = builder.add(Merge[Test](2))
    
        input ~> broadcast
    
        broadcast.out(0) ~> filter1 ~> merge
        broadcast.out(1) ~> filter2 ~> merge
    
        merge ~> output
    
        ClosedShape
    
      }
    ) 
    

    This would print:

    Test(1,23.0)
    Test(1,23.0)
    Test(1,23.0)
    

    On the other hand, if you want zipped content, you could refactor using Option,

    val filter1 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("2")))
    val filter2 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("1")))
    val output = Sink.foreach[(Option[Test] , Option[Test])](println)
    val input = Source.repeat(Option(Test("1" , 23))).take(3)
    
    val graph = RunnableGraph.fromGraph(
      GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
        import GraphDSL.Implicits._
    
        val broadcast = builder.add(Broadcast[Option[Test]](2))
        val zip = builder.add(Zip[Option[Test] , Option[Test]])
    
        input ~> broadcast
    
        broadcast.out(0) ~> filter1 ~> zip.in0
        broadcast.out(1) ~> filter2 ~> zip.in1
    
        zip.out ~> output
    
        ClosedShape
    
      }
    )
    

    This would print:

    (None,Some(Test(1,23.0)))
    (None,Some(Test(1,23.0)))
    (None,Some(Test(1,23.0)))