I'm attempting to apply a separate a filter on 2 output streams as a result of a fanout. I emit 3 objects of Type Test defined as :
case class Test(test: String, tester: Double)
Running the below src does not produce any results :
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, RunnableGraph, Sink, Source, Zip}
object TestFanOut extends App {
implicit val actorSystem = ActorSystem()
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
val output = Sink.foreach[(Test , Test)](println)
val input = Source.repeat(Test("1" , 23)).take(3)
case class Test(test: String, tester: Double)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Test](2))
val zip = builder.add(Zip[Test , Test])
input ~> broadcast
broadcast.out(0) ~> filter1 ~> zip.in0
broadcast.out(1) ~> filter2 ~> zip.in1
zip.out ~> output
ClosedShape
}
)
graph.run()
}
However, changing the filters to :
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
produces :
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0),Test(1,23.0))
It seems my filtering logic is not correctly applied ?
When I define the filters as :
val filter1 = Flow[Test].filter(f => f.test.equalsIgnoreCase("2"))
val filter2 = Flow[Test].filter(f => f.test.equalsIgnoreCase("1"))
I expect output of :
(Test(1,23.0),Test(1,23.0))
(Test(1,23.0))
As this this matches one of the fan out streams.
Zip
requires output from both Flow
s. In this case, it won't be possible because of the filter.
Instead, you could use Merge
,
val output = Sink.foreach[Test](println)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Test](2))
val merge = builder.add(Merge[Test](2))
input ~> broadcast
broadcast.out(0) ~> filter1 ~> merge
broadcast.out(1) ~> filter2 ~> merge
merge ~> output
ClosedShape
}
)
This would print:
Test(1,23.0)
Test(1,23.0)
Test(1,23.0)
On the other hand, if you want zipped content, you could refactor using Option
,
val filter1 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("2")))
val filter2 = Flow[Option[Test]].map(f => f.filter(_.test.equalsIgnoreCase("1")))
val output = Sink.foreach[(Option[Test] , Option[Test])](println)
val input = Source.repeat(Option(Test("1" , 23))).take(3)
val graph = RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Option[Test]](2))
val zip = builder.add(Zip[Option[Test] , Option[Test]])
input ~> broadcast
broadcast.out(0) ~> filter1 ~> zip.in0
broadcast.out(1) ~> filter2 ~> zip.in1
zip.out ~> output
ClosedShape
}
)
This would print:
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))
(None,Some(Test(1,23.0)))