Search code examples
scalaakka-stream

Akka streams dynamic filters


I'm attempting to create a dynamic list of filters because I need to filter on 100's of items and for each item apply a function, I do not want to explicitly define an outlet for each filter so have defined dynamic filters :

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ClosedShape
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, RunnableGraph, Sink, Source}


object DynamicFilters extends App {

  implicit val actorSystem = ActorSystem()

  case class Person(name: String, age: Double)
  val filterNames = List("1" , "2" , "3");
  val printSink = Sink.foreach[Person](println)
  val input = Source(List(Person("1", 30),Person("1", 20),Person("1", 20),Person("1", 30),Person("2", 2)))

  val graph = RunnableGraph.fromGraph(
    GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Person](filterNames.size))
      val merge = builder.add(Merge[Person](filterNames.size))

      input ~> broadcast

      for(index <- 0 to filterNames.size-1){
        println("Adding filter")
        val fi = Flow[Person].filter(f => f.name.equalsIgnoreCase(filterNames(index)))
        broadcast.out(index) ~> fi ~> merge
      }
      merge ~> printSink

      ClosedShape

    }
  )

  graph.run()
}

This solution seems 'hacky', is there an alternative method using Akka streams for filtering on many items within a graph without defining a custom outlet for each ?


Solution

  • In pseudocode, it'd look like this, but it's quite a working option:

    def filterAll[A](stream: AkkaStrem[A])(filters: List[A => Boolean]): AkkaStrem[A] =
      stream.filter(a => filters.forall(p => p(a)))
    

    The point and trick is to do a single filter using forall.

    If there are many cpu intensive tasks, then the threads in the pool will only perform them. And Akka pool is not intended for this - Akka pool serves Akka.