Search code examples
javareactive-programmingspring-webfluxproject-reactorcancellation

How in WebFlux to stop publisher when request is aborted by client?


SpringBoot v2.5.1

There is an endpoint requesting a long running process result and it is created somehow
(for simplicity it is Mono.fromCallable( ... long running ... ).

Client make a request and triggers the publisher to do the work, but after several seconds client aborts the request (i.e. connection is lost). And the process still continues to utilize resources for computation of a result to throw away.

What is a mechanism of notifying Project Reactor's event loop about unnecessary work in progress that should be cancelled?

@RestController 
class EndpointSpin {
 
  @GetMapping("/spin")
  Mono<Long> spin() {
    AtomicLong counter = new AtomicLong(0);
    Instant stopTime = Instant.now().plus(Duration.of(1, ChronoUnit.HOURS));

    return Mono.fromCallable(() -> {

      while (Instant.now().isBefore(stopTime)) {
        counter.incrementAndGet();

        if (counter.get() % 10_000_000 == 0) {
          System.out.println(counter.get());
        }

        // of course this does not work
        if (Thread.currentThread().isInterrupted()){
           break;
        }
      }

      return counter.get();
    });
  }
}

Solution

  • fromCallable doesn't shield you from blocking computation inside the Callable, which your example demonstrates.

    The primary mean of cancellation in Reactive Streams is the cancel() signal propagated from downstream via the Subscription.

    Even with that, the fundamental requirement of avoiding blocking code inside reactive code still holds, because if the operators are simple enough (ie. synchronous), a blocking step could even prevent the propagation of the cancel() signal...

    A way to adapt non-reactive code while still getting notified about cancellation is Mono.create: it exposes a MonoSink (via a Consumer<MonoSink>) which can be used to push elements to downstream, and at the same time it has a onCancel handler.

    You would need to rewrite your code to eg. check an AtomicBoolean on each iteration of the loop, and have that AtomicBoolean flipped in the sink's onCancel handler:

    Mono.create(sink -> {
        AtomicBoolean isCancelled = new AtomicBoolean();
        sink.onCancel(() -> isCancelled.set(true));
        while (...) {
            ...
            if (isCancelled.get()) break;
        }
    });
    

    Another thing that is important to note in your example: the AtomicInteger is shared state. If you subscribe a second time to the returned Mono, both subscriptions will share the counter and increment it / check it in parallel, which is probably not good.

    Creating these state variables inside the Consumer<MonoSink> of Mono.create ensures that each subscription gets its own separate state.