Search code examples
scalaakkaakka-stream

Throttle Akka Ask


I am trying to throttle my ask requests to a consumerActor.

val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
  .throttle(10, 1.second, 1, ThrottleMode.Shaping)
  .to(Sink.foreach[Any](msg => consumerActor ! msg))
  .run()

with

aLotOfItems.map(items =>
  val itemsFuture = (throttler ? consumeItems(items)).mapTo[Future[String]]
  itemsFuture flatMap {x => x}
}).toVector

This does send msgs to the consumerActor but I seem to lose the response as I tried with 2 items but the request just hangs.

I think I need to change the tell in the Sink.foreach to an ask or something that can handle a response

Solution: Got it to work using the selected answer below. I had to add

val answer = Source(...) (from the selected answer below)
sender ! answer

Solution

  • The problem is that you're expecting responses from throttler, but throttler is not sending replies and is unable to do so because it doesn't have a reference to the original sender.

    If consumerActor replies to the sender of each consumeItems(i) message with a Future[String], then one way to achieve what you're trying to do is to create a Source from aLotOfItems and use a combination of mapAsync and ask to throttle messages to the actor. The replies from the actor can be accumulated in a Sink. Something like:

    val sink = Sink.seq[String]
    
    val result =
      Source(aLotOfItems)
        .map(consumeItems(_))
        .mapAsync(parallelism = 5)(item => (consumerActor ? item).mapTo[Future[String]])
        .mapAsync(parallelism = 5)(identity)
        .runWith(sink)
    
    // Future[Seq[String]]