I'm currently working with Akka Streams (in Java) for a personal project and I'm having a hard time understanding how to send element to a Source
.
The idea is to use a WebSocket to push content into the user's web browser. I've managed to use Akka Streams to create a request-response system, following the Akka HTTP documentation, but this is not what I want to do.
Looking into the Akka Streams documentation, I saw that there is Source.queue
and Source.actorRef
. But I don't understand how to put an element into the Source
. Source.queue
and Source.actorRef
return a Source
, which doesn't have the method offer
(for Source.queue
) or tell
(for Source.actorRef
).
My question is: how do I get the ActorRef
for the Source
created by Source.actorRef
or the SourceQueueWithComplete
for a Source
created with Source.queue
, to be able to send elements to my Source
?
I searched the various Akka documentation but found no method to do that. And the majority of the code I found on the Internet is written in Scala, which doesn't seem to have the same problem.
The actor and queue from Source.actorRef
and Source.queue
, respectively, are the materialized values of those sources, meaning that they can be obtained only if the stream is running. For example:
final ActorRef actor =
Source.actorRef(Integer.MAX_VALUE, OverflowStrategy.fail())
.to(Sink.foreach(m -> System.out.println(m)))
.run(materializer);
actor.tell("do something", ActorRef.noSender());
It's no different in Scala:
implicit val materializer = ActorMaterializer()
val actor =
Source.actorRef(Int.MaxValue, OverflowStrategy.fail)
.to(Sink.foreach(println))
.run()
actor ! "do something"