Search code examples
javaakkaakka-streamakka-http

Akka Streams onFailuresWithBackoff not restarting flow


I'm attempting to use RestartFlow in Akka Streams javadsl to restart one of my flow stages if any failures occur during the stage, but it doesn't seem to be restarting the flow and instead just drops the message.

I've already seen this: RestartFlow in Akka Streams not working as expected, but I'm on version 2.5.19 so it should be fixed?

I've tried both RestartFlow.onFailuresWithBackoff and RestartFlow.withBackoff but neither of those worked. I've also tried playing with the overall Actor system supervisor strategy, but that seems to just intercept the exception so that it isn't thrown from the flow and plus doesn't seem to offer the backoff and max retry strategy that I want.

The stream:

public Consumer.DrainingControl<Done> stream() {
    return Consumer.committableSource(consumerSettings,
        Subscriptions.topics(config.getString(ConfigKeys.KAFKA_CONFIG_PREFIX +
            ConfigKeys.CONSUMER_TOPIC)))
        .via(RestartFlow.onFailuresWithBackoff(
                Duration.ofSeconds(1), // min backoff
                Duration.ofSeconds(2), // max backoff,
                0.2, // adds 20% "noise" to vary the intervals slightly
                10, // limits the amount of restarts to 10
                this::dispatchMessageFlow))
        .via(Committer.flow(CommitterSettings.create(system)))
        .toMat(Sink.ignore(), Keep.both())
        .mapMaterializedValue(Consumer::createDrainingControl)
        .run(mat);
}

And then the flow:

private Flow<ConsumerMessage.CommittableMessage<String, String>,
    ConsumerMessage.Committable, NotUsed> dispatchMessageFlow() {
    return Flow.<ConsumerMessage.CommittableMessage<String, String>>create()
            .mapAsyncUnordered(
                config.getInt(ConfigKeys.PARALLELISM),
                msg ->
                    streamProcessor.process(msg.record().value())
                        .whenComplete((done, e) -> {
                            if (e != null) {
                                throw new RuntimeException(e);
                            } else {
                                if (done.status().isSuccess()){
                                    streamingConsumerLogger.info("Successfully posted message, got response:\n{}",
                                        done.toString());
                                } else {
                                    throw new RuntimeException("HTTP Error!");
                                }
                            }
                        })
                        .thenApply(done -> msg.committableOffset()));
}

I see the exception once, with akka stating that it is going to restart the graph due to failure, but nothing else after that. According to my understanding I should be seeing it 10 more times. The consumer continues to listen to new messages so it seems like the message is just dropped.

java.util.concurrent.CompletionException: java.lang.RuntimeException: HTTP Error!
    at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
    at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:769)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
    at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: HTTP Error!
    at com.company.app.messageforwarder.StreamingConsumerService.lambda$null$0(StreamingConsumerService.java:72)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
    ... 6 more

If anyone could please help point me in the right direction I would appreciate it.


Solution

  • it works a bit different way. long story short - if error happens, the message is dropped, but the source/flow would be just restarted, without killing the whole stream. it's described in the RestartFlow.onFailuresWithBackoff documentation:

    The restart process is inherently lossy, since there is no coordination between cancelling and the sending of messages. A termination signal from either end of the wrapped Flow will cause the other end to be terminated, and any in transit messages will be lost. During backoff, this Flow will backpressure.