Search code examples
spring-bootreactive-programmingazureservicebusproject-reactor

How to close the Spring Context on Fatal Exceptions in reactive chains (e.g. OutOfMemory)


Inside of a Spring Application, we are using the ServiceBusReceiverAsyncClient to consume messages from the Azure ServiceBus and processing them in a reactive chain. Something like:

public class MyApplication {

  // ...

  public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args);
  }

}

public class ApplicationLifecycleManager {

  // ...

  @EventListener(ApplicationReadyEvent.class)
  public void onAfterStartUp() {
    receiverAsyncClient
      .receiveMessages()
      .flatMap(...) // do some processing
      .subscribe();
  }

}

We've encountered the issue that when the reactive chain encounters an OutOfMemoryException, we get the logs WARN [parallel-5] reactor.core.Exceptions - throwIfFatal detected a jvm fatal exception, which is thrown and logged below: with the stack trace, but we don't seem to have any way to act upon it, but the Spring Application and unrelated chains keep running.

In our case, we need to make sure that the Spring Context closes and the Application is restarted, so that we can resume processing ServiceBus messages.

We tried to manually catch the Fatal Exception, but nothing seems to work - FatalExceptions seem to bypass all onError operators and hooks. Surrounding try/catch blocks also don't seem to be able to catch the error. We would be fine with calling System.exit(), but we don't seem to be able to catch the exception anywhere.


Solution

  • I forwarded the question to GitHub and got this response.

    To Quote Darius:

    Long story short, we can't handle Errors like Exceptions and propagate them via Reactor's mechanisms. You have to resort to traditional JVM control structures if you want to catch any Error. That is not recommended though and you should allow your JVM to exit as there might be no more memory left to even try to do any cleanup. External control is applicable - e.g. sizing your heap properly, responding to health checks (e.g. using Spring Boot Actuator's endpoint) and restarting the JVM using your orchestrator of choice (e.g. Kubernetes) if there's no timely response to the health checks.

    We resolved the issue by using the JVM Flag -XX:+ExitOnOutOfMemoryError, which allows the service to exit and K8s to restart the pod.

    It may be obvious, as a side note: manually throwing an OutOfMemoryError like throw new OutOfMemoryError("Test error") does not have the same effect as when the error is actually encountered by the JVM.

    Local testing scenarion with the -XX:+ExitOnOutOfMemoryError -Xmx512m options:

    Flux.range(0, 10)
            .delayElements(java.time.Duration.ofMillis(300))
            .flatMap(i -> {
              if (i == 5) {
                log.warn("Filling up memory...");
                List<byte[]> storage = new ArrayList<>();
                while (true) {
                  log.warn("Adding another array to storage");
                  // Each array with size ~10MB
                  byte[] b = new byte[10 * 1024 * 1024];
                  storage.add(b);
                }
              }
              return Flux.just(i);
            })
            .doOnError(err -> System.out.println("### An error occurred: " + err.getMessage())) // Never executed
            .doOnNext((i) -> System.out.println("Got " + i))
            .doOnComplete(() -> System.out.println("Completed"))
            .collectList()
            .subscribe();
    

    Logs:

    Got 1
    Got 2
    Got 3
    Got 4
    [2024-04-30 13:26:24.167] WARN [parallel-3] Filling up memory...
    [2024-04-30 13:26:24.167] WARN [parallel-3] Adding another array to storage
    [2024-04-30 13:26:24.173] WARN [parallel-3] Adding another array to storage
    ...
    [2024-04-30 13:26:24.456] WARN [parallel-3] Adding another array to storage
    Terminating due to java.lang.OutOfMemoryError: Java heap space