Why mapping a class A to class B with monix or akka-streams is so slow?

I've benchmarked the mapping of a List[ClassA] to List[ClassB] with monix and akka-streams but I don't understand why it is so slow.

I've tried different way to map and here is the result with JMH:

[info] Benchmark                                    Mode  Cnt    Score    Error  Units
[info] MappingBenchmark.akkaLoadBalanceMap            ss   20  742,626 â–’  4,853  ms/op
[info] MappingBenchmark.akkaMapAsyncFold              ss   20  480,460 â–’  8,493  ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  331,398 â–’ 10,490  ms/op
[info] MappingBenchmark.akkaMapFold                   ss   20  713,500 â–’  7,394  ms/op
[info] MappingBenchmark.akkaMapFoldAsync              ss   20  313,275 â–’  8,716  ms/op
[info]                           ss   20    0,567 â–’  0,175  ms/op
[info] MappingBenchmark.monixBatchedObservables       ss   20  259,736 â–’  5,939  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  456,310 â–’  5,225  ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  795,345 â–’  5,443  ms/op
[info] MappingBenchmark.monixMapFoldLeft              ss   20  247,172 â–’  5,342  ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync         ss   20  478,840 â–’ 25,249  ms/op
[info] MappingBenchmark.monixTaskGather               ss   20    6,707 â–’  2,176  ms/op
[info] MappingBenchmark.parMap                        ss   20    1,257 â–’  0,831  ms/op

Here is the code:

package benches

import java.util.concurrent.TimeUnit

import akka.NotUsed
import{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration

@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
class MappingBenchmark {
  import monix.eval._
  import monix.reactive._

  def list: List[ClassA] = (1 to 10000).map(ClassA).toList
  //    val l = (1 to 135368).map(Offre).toList

  // ##### SCALA ##### //

  def map: List[ClassB] = => ClassB(o, o))

  def parMap: List[ClassB] = => ClassB(o, o)).toList

  // ##### MONIX ##### //

  def monixTaskGather: List[ClassB] = {
    val task: Task[List[ClassB]] = Task.gatherUnordered( => Task(ClassB(o,o))))
    Await.result(task.runAsync, Duration.Inf)

  def monixBatchedObservables: List[ClassB] = {
    val task: Task[List[ClassB]] =
        .flatMap{items =>
          val tasks = => Task(ClassB(o,o)))
          val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
          val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
          Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        }.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)

  def monixMapFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)

  def monixMapFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)

  def monixMapAsyncFoldLeft: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
    Await.result(task.runAsync, Duration.Inf)

  def monixMapAsyncFoldLeftAsync: List[ClassB] = {
    val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
    Await.result(task.runAsync, Duration.Inf)

  // ##### AKKA-STREAM ##### //

  def akkaMapFold: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)

  def akkaMapFoldAsync: List[ClassB] = {
    val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)

  def akkaMapAsyncFold: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)

  def akkaMapAsyncFoldAsync: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)

  def akkaLoadBalanceMap: List[ClassB] = {
    def graph: RunnableGraph[Future[List[ClassB]]] = {
      val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
      RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
        sink =>
          import GraphDSL.Implicits._
          val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
          val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
          val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
          Source(list) ~> balance
          (1 to 4).foreach{ i =>
            balance ~> mapClassB.async ~> merge
          merge ~> sink

  private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
    implicit val actorSystem = ActorSystem("app")
    implicit val actorMaterializer = ActorMaterializer()
    val eventualBs =
    val res = Await.result(eventualBs, Duration.Inf)

case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)

The bench result is getting even worse when the initial collection is bigger.

I would like to know what my mistake is.

Thanks for sharing your knowledge!

Best regards


  • I've updated the code and the bench is really better than before. The difference is related to the List operator. In fact, the first version was using append instead of preprend. Since List is a linked list, it had to iterate over the elements in order to add a new one. By being lazy, I wanted to use _ operator but I should have not.

    package benches
    import java.util.concurrent.TimeUnit
    import akka.NotUsed
    import{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
    import{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
    import org.openjdk.jmh.annotations._
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration.Duration
    import scala.collection.immutable.Seq
    @Warmup(iterations = 20)
    @Measurement(iterations = 20)
    @Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
    class MappingBenchmark {
      import monix.eval._
      import monix.reactive._
      def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList
      //    val l = (1 to 135368).map(Offre).toList
      // ##### SCALA ##### //
      def foldClassB = (l:List[ClassB], o:ClassB) => o +: l
      def map: Seq[ClassB] = => ClassB(o, o))
      def parMap: Seq[ClassB] = => ClassB(o, o)).toList
      // ##### MONIX ##### //
      def monixTaskGather: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Task.gatherUnordered( => Task(ClassB(o,o))))
        Await.result(task.runAsync, Duration.Inf)
      def monixBatchedObservables: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] =
            .flatMap{items =>
              val tasks = => Task(ClassB(o,o)))
              val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
              val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
              Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
        Await.result(task.runAsync, Duration.Inf)
      def monixMapFoldLeft: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
        Await.result(task.runAsync, Duration.Inf)
      def monixMapFoldLeftAsync: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
        Await.result(task.runAsync, Duration.Inf)
      def monixMapAsyncFoldLeft: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
        Await.result(task.runAsync, Duration.Inf)
      def monixMapAsyncFoldLeftAsync: Seq[ClassB] = {
        val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
        Await.result(task.runAsync, Duration.Inf)
      // ##### AKKA-STREAM ##### //
      def akkaMapFold: Seq[ClassB] = {
        val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
      def akkaMapFoldAsync: Seq[ClassB] = {
        val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
      def akkaMapSeq: Seq[ClassB] = {
        val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right)
      def akkaMapAsyncFold: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
      def akkaMapAsyncFoldAsync: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
      def akkaMapAsyncSeq: Seq[ClassB] = {
        val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right)
      def akkaLoadBalanceMap: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = {
          val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB)
          RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
            sink =>
              import GraphDSL.Implicits._
              val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
              val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
              val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
              Source(list) ~> balance
              (1 to 4).foreach{ i =>
                balance ~> mapClassB.async ~> merge
              merge ~> sink
      def akkaLoadBalanceMapSeq: Seq[ClassB] = {
        def graph: RunnableGraph[Future[Seq[ClassB]]] = {
          val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq
          RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
            sink =>
              import GraphDSL.Implicits._
              val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
              val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
              val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
              Source(list) ~> balance
              (1 to 4).foreach{ i =>
                balance ~> mapClassB.async ~> merge
              merge ~> sink
      private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = {
        implicit val actorSystem = ActorSystem("app")
        implicit val actorMaterializer = ActorMaterializer()
        val eventualBs =
        val res = Await.result(eventualBs, Duration.Inf)
    case class ClassA(a:Int)
    case class ClassB(o:ClassA, o2:ClassA)

    The result with this updated class is :

    [info] Benchmark                                    Mode  Cnt   Score   Error  Units
    [info] MappingBenchmark.akkaLoadBalanceMap            ss   20  19,052 â–’ 3,779  ms/op
    [info] MappingBenchmark.akkaLoadBalanceMapSeq         ss   20  16,115 â–’ 3,232  ms/op
    [info] MappingBenchmark.akkaMapAsyncFold              ss   20  20,862 â–’ 3,127  ms/op
    [info] MappingBenchmark.akkaMapAsyncFoldAsync         ss   20  26,994 â–’ 4,010  ms/op
    [info] MappingBenchmark.akkaMapAsyncSeq               ss   20  19,399 â–’ 7,089  ms/op
    [info] MappingBenchmark.akkaMapFold                   ss   20  12,132 â–’ 4,111  ms/op
    [info] MappingBenchmark.akkaMapFoldAsync              ss   20  22,652 â–’ 3,802  ms/op
    [info] MappingBenchmark.akkaMapSeq                    ss   20  10,894 â–’ 3,114  ms/op
    [info]                           ss   20   0,625 â–’ 0,193  ms/op
    [info] MappingBenchmark.monixBatchedObservables       ss   20   9,175 â–’ 4,080  ms/op
    [info] MappingBenchmark.monixMapAsyncFoldLeft         ss   20  11,724 â–’ 4,458  ms/op
    [info] MappingBenchmark.monixMapAsyncFoldLeftAsync    ss   20  14,174 â–’ 6,962  ms/op
    [info] MappingBenchmark.monixMapFoldLeft              ss   20   1,057 â–’ 0,960  ms/op
    [info] MappingBenchmark.monixMapFoldLeftAsync         ss   20   9,638 â–’ 4,910  ms/op
    [info] MappingBenchmark.monixTaskGather               ss   20   7,065 â–’ 2,428  ms/op
    [info] MappingBenchmark.parMap                        ss   20   1,392 â–’ 0,923  ms/op

    it seems that it is still faster to map with scala if we can before running a stream.