Search code examples
scalaakka-streammonix

Why mapping a class A to class B with monix or akka-streams is so slow?


I've benchmarked the mapping of a List[ClassA] to List[ClassB] with monix and akka-streams but I don't understand why it is so slow.

I've tried different way to map and here is the result with JMH:

[info] Benchmark                                    Mode  Cnt    Score    Error  Units
[info] MappingBenchmark.akkaLoadBalanceMap            ss   20  742,626 â–’  4,853  ms/op
[info] MappingBenchmark.akkaMapAsyncFold              ss   20  480,460 â–’  8,493  ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  331,398 â–’ 10,490  ms/op
[info] MappingBenchmark.akkaMapFold                   ss   20  713,500 â–’  7,394  ms/op
[info] MappingBenchmark.akkaMapFoldAsync              ss   20  313,275 â–’  8,716  ms/op
[info] MappingBenchmark.map                           ss   20    0,567 â–’  0,175  ms/op
[info] MappingBenchmark.monixBatchedObservables       ss   20  259,736 â–’  5,939  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  456,310 â–’  5,225  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  795,345 â–’  5,443  ms/op
[info] MappingBenchmark.monixMapFoldLeft              ss   20  247,172 â–’  5,342  ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync         ss   20  478,840 â–’ 25,249  ms/op
[info] MappingBenchmark.monixTaskGather               ss   20    6,707 â–’  2,176  ms/op
[info] MappingBenchmark.parMap                        ss   20    1,257 â–’  0,831  ms/op

Here is the code:

package benches

import java.util.concurrent.TimeUnit

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
  import monix.eval._
  import monix.reactive._
  import monix.execution.Scheduler.Implicits.global

  def list: List[ClassA] = (1 to 10000).map(ClassA).toList
  //    val l = (1 to 135368).map(Offre).toList

  // ##### SCALA ##### //

  @Benchmark
  def map: List[ClassB] = list.map(o => ClassB(o, o))

  @Benchmark
  def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList

  // ##### MONIX ##### //

  @Benchmark
  def monixTaskGather: List[ClassB] = {
    val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixBatchedObservables: List[ClassB] = {
    val task: Task[List[ClassB]] =
      Observable.fromIterable(list)
        .bufferIntrospective(256)
        .flatMap{items =>
          val tasks = items.map(o => Task(ClassB(o,o)))
          val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
          val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
          Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        }.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)
  }

  @Benchmark
  def monixMapAsyncFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)
  }

  // ##### AKKA-STREAM ##### //

  @Benchmark
  def akkaMapFold: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapFoldAsync: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFold: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaMapAsyncFoldAsync: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
    runAkkaGraph(graph)
  }

  @Benchmark
  def akkaLoadBalanceMap: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = {
      val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
      RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          }
          merge ~> sink
          ClosedShape
      })
    }
    runAkkaGraph(graph)
  }

  private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
    implicit val actorSystem = ActorSystem("app")
    implicit val actorMaterializer = ActorMaterializer()
    val eventualBs = g.run()
    val res = Await.result(eventualBs, Duration.Inf)
    actorSystem.terminate()
    res
  }
}

case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)

The bench result is getting even worse when the initial collection is bigger.

I would like to know what my mistake is.

Thanks for sharing your knowledge!

Best regards


Solution

  • I've updated the code and the bench is really better than before. The difference is related to the List operator. In fact, the first version was using append instead of preprend. Since List is a linked list, it had to iterate over the elements in order to add a new one. By being lazy, I wanted to use _ operator but I should have not.

    package benches
    
    import java.util.concurrent.TimeUnit
    
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
    import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
    import org.openjdk.jmh.annotations._
    
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration.Duration
    import scala.collection.immutable.Seq
    
    @OutputTimeUnit(TimeUnit.MILLISECONDS)
    @BenchmarkMode(Array(Mode.SingleShotTime))
    @Warmup(iterations = 20)
    @Measurement(iterations = 20)
    @Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
    @Threads(1)
    class MappingBenchmark {
      import monix.eval._
      import monix.reactive._
      import monix.execution.Scheduler.Implicits.global
    
      def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList
      //    val l = (1 to 135368).map(Offre).toList
    
      // ##### SCALA ##### //
    
      def foldClassB = (l:List[ClassB], o:ClassB) => o +: l
    
      @Benchmark
      def map: Seq[ClassB] = list.map(o => ClassB(o, o))
    
      @Benchmark
      def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList
    
      // ##### MONIX ##### //
    
      @Benchmark
      def monixTaskGather: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
        Await.result(task.runAsync, Duration.Inf)
      }
    
      @Benchmark
      def monixBatchedObservables: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] =
          Observable.fromIterable(list)
            .bufferIntrospective(256)
            .flatMap{items =>
              val tasks = items.map(o => Task(ClassB(o,o)))
              val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
              val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
              Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
            }.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
        Await.result(task.runAsync, Duration.Inf)
      }
    
      @Benchmark
      def monixMapFoldLeft: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
        Await.result(task.runAsync, Duration.Inf)
      }
    
      @Benchmark
      def monixMapFoldLeftAsync: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
        Await.result(task.runAsync, Duration.Inf)
      }
    
      @Benchmark
      def monixMapAsyncFoldLeft: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
        Await.result(task.runAsync, Duration.Inf)
      }
    
      @Benchmark
      def monixMapAsyncFoldLeftAsync: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
        Await.result(task.runAsync, Duration.Inf)
      }
    
      // ##### AKKA-STREAM ##### //
    
      @Benchmark
      def akkaMapFold: Seq[ClassB] = {
        val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
        runAkkaGraph(graph)
      }
    
      @Benchmark
      def akkaMapFoldAsync: Seq[ClassB] = {
        val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
        runAkkaGraph(graph)
      }
    
      @Benchmark
      def akkaMapSeq: Seq[ClassB] = {
        val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right)
        runAkkaGraph(graph)
      }
    
      @Benchmark
      def akkaMapAsyncFold: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
        runAkkaGraph(graph)
      }
    
      @Benchmark
      def akkaMapAsyncFoldAsync: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
        runAkkaGraph(graph)
      }
    
      @Benchmark
      def akkaMapAsyncSeq: Seq[ClassB] = {
        val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right)
        runAkkaGraph(graph)
      }
    
      @Benchmark
      def akkaLoadBalanceMap: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = {
          val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB)
          RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
            sink =>
              import GraphDSL.Implicits._
              val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
              val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
              val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
              Source(list) ~> balance
              (1 to 4).foreach{ i =>
                balance ~> mapClassB.async ~> merge
              }
              merge ~> sink
              ClosedShape
          })
        }
        runAkkaGraph(graph)
      }
    
      @Benchmark
      def akkaLoadBalanceMapSeq: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = {
          val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq
          RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
            sink =>
              import GraphDSL.Implicits._
              val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
              val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
              val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
              Source(list) ~> balance
              (1 to 4).foreach{ i =>
                balance ~> mapClassB.async ~> merge
              }
              merge ~> sink
              ClosedShape
          })
        }
        runAkkaGraph(graph)
      }
    
      private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = {
        implicit val actorSystem = ActorSystem("app")
        implicit val actorMaterializer = ActorMaterializer()
        val eventualBs = g.run()
        val res = Await.result(eventualBs, Duration.Inf)
        actorSystem.terminate()
        res
      }
    }
    
    case class ClassA(a:Int)
    case class ClassB(o:ClassA, o2:ClassA)
    

    The result with this updated class is :

    [info] Benchmark                                    Mode  Cnt   Score   Error  Units
    [info] MappingBenchmark.akkaLoadBalanceMap            ss   20  19,052 â–’ 3,779  ms/op
    [info] MappingBenchmark.akkaLoadBalanceMapSeq         ss   20  16,115 â–’ 3,232  ms/op
    [info] MappingBenchmark.akkaMapAsyncFold              ss   20  20,862 â–’ 3,127  ms/op
    [info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  26,994 â–’ 4,010  ms/op
    [info] MappingBenchmark.akkaMapAsyncSeq               ss   20  19,399 â–’ 7,089  ms/op
    [info] MappingBenchmark.akkaMapFold                   ss   20  12,132 â–’ 4,111  ms/op
    [info] MappingBenchmark.akkaMapFoldAsync              ss   20  22,652 â–’ 3,802  ms/op
    [info] MappingBenchmark.akkaMapSeq                    ss   20  10,894 â–’ 3,114  ms/op
    [info] MappingBenchmark.map                           ss   20   0,625 â–’ 0,193  ms/op
    [info] MappingBenchmark.monixBatchedObservables       ss   20   9,175 â–’ 4,080  ms/op
    [info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  11,724 â–’ 4,458  ms/op
    [info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  14,174 â–’ 6,962  ms/op
    [info] MappingBenchmark.monixMapFoldLeft              ss   20   1,057 â–’ 0,960  ms/op
    [info] MappingBenchmark.monixMapFoldLeftAsync         ss   20   9,638 â–’ 4,910  ms/op
    [info] MappingBenchmark.monixTaskGather               ss   20   7,065 â–’ 2,428  ms/op
    [info] MappingBenchmark.parMap                        ss   20   1,392 â–’ 0,923  ms/op
    

    it seems that it is still faster to map with scala if we can before running a stream.