I have a Spring
application that is triggered with an ApplicationReadyEvent
as:
@EventListener(ApplicationReadyEvent.class)
public void poll() throws InterruptedException {
Flux.just(1, 2, 3)
.doOnNext(System.out::println)
.log()
.subscribeOn(Schedulers.boundedElastic())
.log()
.flatMap(Mono::just)
.subscribe();
System.out.println("Thread exiting: " + Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
Here are the logs:
{"timeStamp":"2021-04-21T15:16:31.288+05:30","message":"onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)","logger":"reactor.Flux.SubscribeOn.2","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:16:31.289+05:30","message":"request(256)","logger":"reactor.Flux.SubscribeOn.2","thread":"main","level":"INFO"}
Thread exiting: main
The request does seem to propagate above subscribeOn
. However, if I don't use subscribeOn
, it appears to be working normally as expected.
@EventListener(ApplicationReadyEvent.class)
public void poll() throws InterruptedException {
Flux.just(1, 2, 3)
.doOnNext(System.out::println)
.log()
.flatMap(Mono::just)
.subscribe();
System.out.println("Thread exiting: " + Thread.currentThread().getName());
Thread.sleep(Integer.MAX_VALUE);
}
Logs:
{"timeStamp":"2021-04-21T15:20:35.370+05:30","message":"| onSubscribe([Fuseable] FluxPeekFuseable.PeekFuseableSubscriber)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| request(256)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
1
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| onNext(1)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
2
{"timeStamp":"2021-04-21T15:20:35.371+05:30","message":"| onNext(2)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
3
{"timeStamp":"2021-04-21T15:20:35.372+05:30","message":"| onNext(3)","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
{"timeStamp":"2021-04-21T15:20:35.372+05:30","message":"| onComplete()","logger":"reactor.Flux.PeekFuseable.1","thread":"main","level":"INFO"}
Thread exiting: main
What is the reason behind such behaviour?
This problem was caused by a badly implemented forwarding ScheduledExecutorService
class that I was using instead of Reactor's default service as:
Schedulers.addExecutorServiceDecorator(
"forwardingScheduledExecutorService",
(scheduler, scheduledExecutorService) ->
new ForwardingScheduledExecutorService(scheduledExecutorService));