I've benchmarked the mapping of a List[ClassA] to List[ClassB] with monix and akka-streams but I don't understand why it is so slow.
I've tried different way to map and here is the result with JMH:
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 742,626 â–’ 4,853 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 480,460 â–’ 8,493 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 331,398 â–’ 10,490 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 713,500 â–’ 7,394 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 313,275 â–’ 8,716 ms/op
[info] MappingBenchmark.map ss 20 0,567 â–’ 0,175 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 259,736 â–’ 5,939 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 456,310 â–’ 5,225 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 795,345 â–’ 5,443 ms/op
[info] MappingBenchmark.monixMapFoldLeft ss 20 247,172 â–’ 5,342 ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync ss 20 478,840 â–’ 25,249 ms/op
[info] MappingBenchmark.monixTaskGather ss 20 6,707 â–’ 2,176 ms/op
[info] MappingBenchmark.parMap ss 20 1,257 â–’ 0,831 ms/op
Here is the code:
package benches
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
import monix.eval._
import monix.reactive._
import monix.execution.Scheduler.Implicits.global
def list: List[ClassA] = (1 to 10000).map(ClassA).toList
// val l = (1 to 135368).map(Offre).toList
// ##### SCALA ##### //
@Benchmark
def map: List[ClassB] = list.map(o => ClassB(o, o))
@Benchmark
def parMap: List[ClassB] = list.par.map(o => ClassB(o, o)).toList
// ##### MONIX ##### //
@Benchmark
def monixTaskGather: List[ClassB] = {
val task: Task[List[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixBatchedObservables: List[ClassB] = {
val task: Task[List[ClassB]] =
Observable.fromIterable(list)
.bufferIntrospective(256)
.flatMap{items =>
val tasks = items.map(o => Task(ClassB(o,o)))
val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
}.consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeft: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(_ :+ _))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeftAsync: List[ClassB] = {
val task: Task[List[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(l :+ o)))
Await.result(task.runAsync, Duration.Inf)
}
// ##### AKKA-STREAM ##### //
@Benchmark
def akkaMapFold: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapFoldAsync: List[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFold: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(_ :+ _))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFoldAsync: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(l :+ o)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMap: List[ClassB] = {
def graph: RunnableGraph[Future[List[ClassB]]] = {
val sink: Sink[ClassB, Future[List[ClassB]]] = Sink.fold(List[ClassB]())(_ :+ _)
RunnableGraph.fromGraph[Future[List[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
private def runAkkaGraph(g:RunnableGraph[Future[List[ClassB]]]): List[ClassB] = {
implicit val actorSystem = ActorSystem("app")
implicit val actorMaterializer = ActorMaterializer()
val eventualBs = g.run()
val res = Await.result(eventualBs, Duration.Inf)
actorSystem.terminate()
res
}
}
case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
The bench result is getting even worse when the initial collection is bigger.
I would like to know what my mistake is.
Thanks for sharing your knowledge!
Best regards
I've updated the code and the bench is really better than before. The difference is related to the List operator. In fact, the first version was using append instead of preprend. Since List is a linked list, it had to iterate over the elements in order to add a new one. By being lazy, I wanted to use _ operator but I should have not.
package benches
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.{ActorMaterializer, ClosedShape, UniformFanInShape, UniformFanOutShape}
import akka.stream.scaladsl.{Balance, Flow, GraphDSL, Keep, Merge, RunnableGraph, Sink, Source}
import org.openjdk.jmh.annotations._
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.collection.immutable.Seq
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.SingleShotTime))
@Warmup(iterations = 20)
@Measurement(iterations = 20)
@Fork(value = 1, jvmArgs = Array("-server", "-Xmx8g"))
@Threads(1)
class MappingBenchmark {
import monix.eval._
import monix.reactive._
import monix.execution.Scheduler.Implicits.global
def list: Seq[ClassA] = (1 to 10000).map(ClassA).toList
// val l = (1 to 135368).map(Offre).toList
// ##### SCALA ##### //
def foldClassB = (l:List[ClassB], o:ClassB) => o +: l
@Benchmark
def map: Seq[ClassB] = list.map(o => ClassB(o, o))
@Benchmark
def parMap: Seq[ClassB] = list.par.map(o => ClassB(o, o)).toList
// ##### MONIX ##### //
@Benchmark
def monixTaskGather: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Task.gatherUnordered(list.map(o => Task(ClassB(o,o))))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixBatchedObservables: Seq[ClassB] = {
val task: Task[Seq[ClassB]] =
Observable.fromIterable(list)
.bufferIntrospective(256)
.flatMap{items =>
val tasks = items.map(o => Task(ClassB(o,o)))
val batches = tasks.sliding(10,10).map(b => Task.gatherUnordered(b))
val aggregate: Task[Iterator[ClassB]] = Task.sequence(batches).map(_.flatten)
Observable.fromTask(aggregate).flatMap(i => Observable.fromIterator(i))
}.consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeft: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapFoldLeftAsync: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).map(o => ClassB(o, o)).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeft: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeft(List[ClassB]())(foldClassB))
Await.result(task.runAsync, Duration.Inf)
}
@Benchmark
def monixMapAsyncFoldLeftAsync: Seq[ClassB] = {
val task: Task[Seq[ClassB]] = Observable.fromIterable(list).mapAsync(4)(o => Task(ClassB(o, o))).consumeWith(Consumer.foldLeftAsync(List[ClassB]())((l, o) => Task(o +: l)))
Await.result(task.runAsync, Duration.Inf)
}
// ##### AKKA-STREAM ##### //
@Benchmark
def akkaMapFold: Seq[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapFoldAsync: Seq[ClassB] = {
val graph: RunnableGraph[Future[List[ClassB]]] = Source(list).map(o => ClassB(o,o)).toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapSeq: Seq[ClassB] = {
val graph = Source(list).map(o => ClassB(o,o)).toMat(Sink.seq)(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFold: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.fold(List[ClassB]())(foldClassB))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncFoldAsync: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).async.toMat(Sink.foldAsync(List[ClassB]())((l, o) => Future(o +: l)))(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaMapAsyncSeq: Seq[ClassB] = {
val graph = Source(list).mapAsync(4)(o => Future(ClassB(o,o))).toMat(Sink.seq)(Keep.right)
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMap: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = {
val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.fold(List[ClassB]())(foldClassB)
RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
@Benchmark
def akkaLoadBalanceMapSeq: Seq[ClassB] = {
def graph: RunnableGraph[Future[Seq[ClassB]]] = {
val sink: Sink[ClassB, Future[Seq[ClassB]]] = Sink.seq
RunnableGraph.fromGraph[Future[Seq[ClassB]]](GraphDSL.create(sink) { implicit builder =>
sink =>
import GraphDSL.Implicits._
val balance: UniformFanOutShape[ClassA, ClassA] = builder.add(Balance[ClassA](4))
val merge: UniformFanInShape[ClassB, ClassB] = builder.add(Merge[ClassB](4))
val mapClassB: Flow[ClassA, ClassB, NotUsed] = Flow[ClassA].map(o => ClassB(o,o))
Source(list) ~> balance
(1 to 4).foreach{ i =>
balance ~> mapClassB.async ~> merge
}
merge ~> sink
ClosedShape
})
}
runAkkaGraph(graph)
}
private def runAkkaGraph(g:RunnableGraph[Future[Seq[ClassB]]]): Seq[ClassB] = {
implicit val actorSystem = ActorSystem("app")
implicit val actorMaterializer = ActorMaterializer()
val eventualBs = g.run()
val res = Await.result(eventualBs, Duration.Inf)
actorSystem.terminate()
res
}
}
case class ClassA(a:Int)
case class ClassB(o:ClassA, o2:ClassA)
The result with this updated class is :
[info] Benchmark Mode Cnt Score Error Units
[info] MappingBenchmark.akkaLoadBalanceMap ss 20 19,052 â–’ 3,779 ms/op
[info] MappingBenchmark.akkaLoadBalanceMapSeq ss 20 16,115 â–’ 3,232 ms/op
[info] MappingBenchmark.akkaMapAsyncFold ss 20 20,862 â–’ 3,127 ms/op
[info] MappingBenchmark.akkaMapAsyncFoldAsync ss 20 26,994 â–’ 4,010 ms/op
[info] MappingBenchmark.akkaMapAsyncSeq ss 20 19,399 â–’ 7,089 ms/op
[info] MappingBenchmark.akkaMapFold ss 20 12,132 â–’ 4,111 ms/op
[info] MappingBenchmark.akkaMapFoldAsync ss 20 22,652 â–’ 3,802 ms/op
[info] MappingBenchmark.akkaMapSeq ss 20 10,894 â–’ 3,114 ms/op
[info] MappingBenchmark.map ss 20 0,625 â–’ 0,193 ms/op
[info] MappingBenchmark.monixBatchedObservables ss 20 9,175 â–’ 4,080 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeft ss 20 11,724 â–’ 4,458 ms/op
[info] MappingBenchmark.monixMapAsyncFoldLeftAsync ss 20 14,174 â–’ 6,962 ms/op
[info] MappingBenchmark.monixMapFoldLeft ss 20 1,057 â–’ 0,960 ms/op
[info] MappingBenchmark.monixMapFoldLeftAsync ss 20 9,638 â–’ 4,910 ms/op
[info] MappingBenchmark.monixTaskGather ss 20 7,065 â–’ 2,428 ms/op
[info] MappingBenchmark.parMap ss 20 1,392 â–’ 0,923 ms/op
it seems that it is still faster to map with scala if we can before running a stream.