Using the example provided in the Play! Framework documentation, I've created a modified version that issues some delayed events, so I could observe them occurring at the same rate in the client side:
public Result playExampleDelayed() {
Source<ByteString, ?> source = Source.<ByteString> actorRef(5, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
for (int i = 0; i < 10; ++i) {
Thread.sleep(1000);
sourceActor.tell(ByteString.fromString("tick " + i), null);
}
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
return null;
});
return ok().chunked(source);
}
However, using curl
, we get all the events in a single step, when the source is completed.
Using a different source type I can get the intended behavior:
public Result tick() {
Source<ByteString, ?> source = Source.<ByteString> tick(Duration.create(0, TimeUnit.SECONDS),
Duration.create(1, TimeUnit.SECONDS), ByteString.fromString("tick"));
return ok().chunked(source);
}
In this case, I'll get in the console one chunk per second.
According to the Akka documentation, I'd expect the first example to work. What am I doing wrong?
You're using Thread.sleep
in your mapMaterializedValue
call. This is a synchronous call that happens right after you run()
your stream. Blocking in here (e.g. with a Thread.sleep
) will block the whole materialization. Therefore, all the messages will be picked up by the actor at the very end of the loop execution.
Bottom line: always steer away from using Thread.sleep
when using Akka.
On the contrary, Source.tick
uses a scheduler (async, non-blocking), and is therefore a far more performant, robust and elegant solution.