I'm attempting to use RestartFlow
in Akka Streams javadsl to restart one of my flow stages if any failures occur during the stage, but it doesn't seem to be restarting the flow and instead just drops the message.
I've already seen this: RestartFlow in Akka Streams not working as expected, but I'm on version 2.5.19 so it should be fixed?
I've tried both RestartFlow.onFailuresWithBackoff
and RestartFlow.withBackoff
but neither of those worked. I've also tried playing with the overall Actor system supervisor strategy, but that seems to just intercept the exception so that it isn't thrown from the flow and plus doesn't seem to offer the backoff and max retry strategy that I want.
The stream:
public Consumer.DrainingControl<Done> stream() {
return Consumer.committableSource(consumerSettings,
Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
ConfigKeys.CONSUMER_TOPIC)))
.via(RestartFlow.onFailuresWithBackoff(
Duration.ofSeconds(1), // min backoff
Duration.ofSeconds(2), // max backoff,
0.2, // adds 20% "noise" to vary the intervals slightly
10, // limits the amount of restarts to 10
this::dispatchMessageFlow))
.via(Committer.flow(CommitterSettings.create(system)))
.toMat(Sink.ignore(), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(mat);
}
And then the flow:
private Flow<ConsumerMessage.CommittableMessage<String, String>,
ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
.mapAsyncUnordered(
config.getInt(ConfigKeys.PARALLELISM),
msg ->
streamProcessor.process(msg.record().value())
.whenComplete((done, e) -> {
if (e != null) {
throw new RuntimeException(e);
} else {
if (done.status().isSuccess()){
streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
done.toString());
} else {
throw new RuntimeException("HTTP Error!");
}
}
})
.thenApply(done -> msg.committableOffset()));
}
I see the exception once, with akka stating that it is going to restart the graph due to failure, but nothing else after that. According to my understanding I should be seeing it 10 more times. The consumer continues to listen to new messages so it seems like the message is just dropped.
java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
at com.company.app.messageforwarder.StreamingConsumerService.lambda$null$0(StreamingConsumerService.java:72)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
... 6 more
If anyone could please help point me in the right direction I would appreciate it.
it works a bit different way. long story short - if error happens, the message is dropped, but the source/flow would be just restarted, without killing the whole stream. it's described in the RestartFlow.onFailuresWithBackoff documentation:
The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.