I am studying akka-stream at scala.
I want to get the value sent from the actor with code like this:
case class Message(value: String)
class LibraryService @Inject()(@Named("library-actor") library: ActorRef)(implicit ec: ExecutionContext, ml: Materializer) {
val sink = Sink.actorRef[Message](library, onCompleteMessage = "stream completed", onFailureMessage = (throwable: Throwable) => Fail(throwable.getMessage))
val source = Source.single(Message("test"))
def add(message: String): Unit = {
val runnable = source to sink
val value = runnable.run() // I want get "I receive 'test'" at here.
print(value) // This code prints "Not Used"
}
}
class Library extends Actor {
override def receive: Receive = {
case Message(message) => {
sender ! s"I receive '$message'"
}
}
}
However, in the above code, only "Not Used" is output, and the value sent by the sender is not stored in'value'.
I want to get the value sent by the actor's 'sender'.
If you know anything, please let me know.
Sink.actorRef
materializes as NotUsed
, meaning that it doesn't provide a useful value. All it does is forward the messages to the target actor.
To get a response back, the best approach to this example is probably:
Source.single(Message("test"))
.runWith(Sink.foreachAsync(1) { msg =>
(library ? msg).map(println _)
})
In general, if you're sending messages to an actor from a stream and expecting a response, mapAsync
with an ask should be your first choice (mapAsyncUnordered
for better throughput if and only if you don't care about preserving stream order). If you're doing a mapAsyncUnordered
and immediately passing the results to Sink.foreach
to invoke a side effect, it's probably worth combining the ask with the side effect in a Sink.foreachAsync
and removing the mapAsyncUnordered
.
Editing to add: If you do care about ordering (hence using a mapAsync
over mapAsyncUnordered
), you can still use Sink.foreachAsync
, but be sure to use single parallelism (i.e. Sink.foreachAsync(1)
). On the other hand, the advantage of mapAsync
into Sink.foreach
is that you can have parallelism be greater than one on the mapAsync
and still preserve ordering for the side effect in the sink, e.g.:
/**
* Transforms values from a source in parallel and applies a side
* effect to the transformed values. If value `a` is emitted before
* value `b` by the source, the effects arising from `pureFutF(a)`
* will execute before the effects arising from `pureFutF(b)`.
*
* @param src
* @param parallelism how many calls to pureFutF are in-flight at a
* time
* @param pureFutF asynchronous transformation; if parallelism is
* greater than 1, order is not guaranteed, so ideally the order
* in which transformations are executed is not important
* @param sideEffect
* @return a future which successfully completes when the source has
* completed and all side effects have been executed
*/
def runParallelStreamMap[A, B](src: Source[A, Any], parallelism: Int)
( pureFutF: A => Future[B], sideEffect: B => Unit): Future[Done] = {
src.mapAsync(parallelism)(pureFutF).runWith(Sink.foreach(sideEffect))
}
It would be nice if there was a .contramapAsync
method on a Sink
.