Search code examples
scalaakka-stream

Using a single actor for in and out ports of a Flow


I'm starting to explore the akka-stream API and I'm looking to test it out. I have a specific use case I can't seem to find an appropriate streaming API to use.

The idea is similar to the following with a bit of a twist:

Source(1 to 10).mapAsync(num => actorRef ? Request(num)).to(Sink.foreach(println))

In this case the ask will return a single message through the future. I'd like to return more than one message for each request sent to the actor. For example:

  1. ActorRef gets sent Request(2)
  2. ActorRef sends N messages out of the flow.
  3. ActorRef gets sent Request(3)
  4. ActorRef sends M messages out of the flow.

I can add extra logic into the Actor to aggregate/buffer these extra messages into a Iterable of some sort but I'm wondering if I'm missing something in the API to handle a case like this.

The closest I've come while looking through the docs is using a Sink.actorRefWithAck and Source.queue with Flow.fromSinkAndSourceMat and passing the materialized SourceQueue to the actorRef before acking the onInitMessage. This allows control of up and downstream back pressure while handling one-to-many messages through the flow. It seems counter productive in making the Graph easy to follow.


Solution

  • I believe you're looking for flatMapConcat.

    Source(1 to 10).flatMapConcat(num => 
      // Dummy example - outputs first 5 successors for each incoming element
      Source(num to (num + 5))
    ).to(Sink.foreach(println))
    

    This way you let Akka lift all the reactive streams weight, and you let no bare actor pollute your pipeline.

    Documentation here.