I am trying to throttle my ask
requests to a consumerActor.
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(10, 1.second, 1, ThrottleMode.Shaping)
.to(Sink.foreach[Any](msg => consumerActor ! msg))
.run()
with
aLotOfItems.map(items =>
val itemsFuture = (throttler ? consumeItems(items)).mapTo[Future[String]]
itemsFuture flatMap {x => x}
}).toVector
This does send msgs to the consumerActor but I seem to lose the response as I tried with 2 items but the request just hangs.
I think I need to change the tell
in the Sink.foreach
to an ask or something that can handle a response
Solution: Got it to work using the selected answer below. I had to add
val answer = Source(...) (from the selected answer below)
sender ! answer
The problem is that you're expecting responses from throttler
, but throttler
is not sending replies and is unable to do so because it doesn't have a reference to the original sender.
If consumerActor
replies to the sender of each consumeItems(i)
message with a Future[String]
, then one way to achieve what you're trying to do is to create a Source
from aLotOfItems
and use a combination of mapAsync
and ask
to throttle messages to the actor. The replies from the actor can be accumulated in a Sink
. Something like:
val sink = Sink.seq[String]
val result =
Source(aLotOfItems)
.map(consumeItems(_))
.mapAsync(parallelism = 5)(item => (consumerActor ? item).mapTo[Future[String]])
.mapAsync(parallelism = 5)(identity)
.runWith(sink)
// Future[Seq[String]]