Search code examples
scalaakkaactorakka-stream

How get return value when using actor in akka-stream


I am studying akka-stream at scala.
I want to get the value sent from the actor with code like this:

case class Message(value: String)

class LibraryService @Inject()(@Named("library-actor") library: ActorRef)(implicit ec: ExecutionContext, ml: Materializer) {
  val sink = Sink.actorRef[Message](library, onCompleteMessage = "stream completed", onFailureMessage = (throwable: Throwable) => Fail(throwable.getMessage))
  val source = Source.single(Message("test"))

  def add(message: String): Unit = {
    val runnable = source to sink
    val value = runnable.run() // I want get "I receive 'test'"  at here.
    print(value) // This code prints "Not Used"
  }
}

class Library extends Actor {
  override def receive: Receive = {
    case Message(message) => {
      sender ! s"I receive '$message'"
    }
  }
}

However, in the above code, only "Not Used" is output, and the value sent by the sender is not stored in'value'.
I want to get the value sent by the actor's 'sender'.
If you know anything, please let me know.


Solution

  • Sink.actorRef materializes as NotUsed, meaning that it doesn't provide a useful value. All it does is forward the messages to the target actor.

    To get a response back, the best approach to this example is probably:

    Source.single(Message("test"))
      .runWith(Sink.foreachAsync(1) { msg =>
        (library ? msg).map(println _)
      })
    

    In general, if you're sending messages to an actor from a stream and expecting a response, mapAsync with an ask should be your first choice (mapAsyncUnordered for better throughput if and only if you don't care about preserving stream order). If you're doing a mapAsyncUnordered and immediately passing the results to Sink.foreach to invoke a side effect, it's probably worth combining the ask with the side effect in a Sink.foreachAsync and removing the mapAsyncUnordered.

    Editing to add: If you do care about ordering (hence using a mapAsync over mapAsyncUnordered), you can still use Sink.foreachAsync, but be sure to use single parallelism (i.e. Sink.foreachAsync(1)). On the other hand, the advantage of mapAsync into Sink.foreach is that you can have parallelism be greater than one on the mapAsync and still preserve ordering for the side effect in the sink, e.g.:

    /**
     * Transforms values from a source in parallel and applies a side 
     * effect to the transformed values. If value `a` is emitted before
     * value `b` by the source, the effects arising from `pureFutF(a)` 
     * will execute before the effects arising from `pureFutF(b)`.
     *
     * @param src
     * @param parallelism how many calls to pureFutF are in-flight at a 
     *  time
     * @param pureFutF asynchronous transformation; if parallelism is 
     *  greater than 1, order is not guaranteed, so ideally the order
     *  in which transformations are executed is not important
     * @param sideEffect
     * @return a future which successfully completes when the source has 
     *  completed and all side effects have been executed
     */
    def runParallelStreamMap[A, B](src: Source[A, Any], parallelism: Int)
      ( pureFutF: A => Future[B], sideEffect: B => Unit): Future[Done] = {
    
      src.mapAsync(parallelism)(pureFutF).runWith(Sink.foreach(sideEffect))
    }
    

    It would be nice if there was a .contramapAsync method on a Sink.