Search code examples
scalaakkaakka-stream

Sink fold for akka stream Source.actorRef buffer and OverflowStrategy


Here is the code snippet from akka documentation

val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

val (ref, future) = Source.actorRef(3, OverflowStrategy.fail)
  .toMat(sinkUnderTest)(Keep.both).run()

ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")

val result = Await.result(future, 3.seconds)
assert(result == "123")

It is a working code snippet, However, if I use ref to tell another message like ref ! 4 , I got an exception like akka.stream.BufferOverflowException: Buffer overflow (max capacity was: 3)

I guess that buffer size 3 should be enough. The reason is for fold operation is (acc, ele) => acc, so it takes accumulator and element to return new value accumulator.

So I changed the code let another actor tell wait for 3 secs. And it is working again.

  val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)

  private val (ref, future): (ActorRef, Future[String]) = Source.actorRef(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()

  ref ! 1
  ref ! 2
  ref ! 3
  Thread.sleep(3000)
  ref ! 4
  ref ! akka.actor.Status.Success("done")

  val result = Await.result(future, 10.seconds)

  println(result)

However, my question is that is there a way we can tell Akka stream to slow down or wait for the sink to be available. I am also using the OverflowStrategy.backpressure, but it said Backpressure overflowStrategy not supported.

Any other options?


Solution

  • You should look into Source.queue as a way to inject elements into the stream from outside in a backpressure-aware fashion.

    Source.queue will materialize to a queue object that you can offer elements to, however when you offer them you will get back a Future that completes when the stream is ready to accept the message.

    Example below:

      val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
    
      val (queue, future): (SourceQueueWithComplete[Int], Future[String]) =
        Source.queue(3, OverflowStrategy.backpressure).toMat(sinkUnderTest)(Keep.both).run()
    
      Future.sequence(Seq(
        queue.offer(1),
        queue.offer(2),
        queue.offer(3),
        queue.offer(4)
      ))
    
      queue.complete()
    
      val result = Await.result(future, 10.seconds)
    
      println(result)
    

    More info in the docs.