Search code examples
javaproject-reactorserver-sent-eventsspring-webclientreactor-netty

Reading Server-Sent Events using reactive Spring WebClient and feeding them to a slow/restricted consumer produces an OverflowException


I have the following test:

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Schedulers;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

class MainTest {
    private TestServer server;

    @BeforeEach
    void init() throws Exception {
        server = new TestServer();
    }

    @AfterEach
    void cleanup() throws Exception {
        server.shutdown();
    }

    @Test
    void test() {
        WebClient client = WebClient.builder().build();

        client.get()
                .uri("http://localhost:9999/")
                .retrieve()
                .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
                })
                .map(ServerSentEvent::data)
                .publishOn(Schedulers.single())
                .blockLast();
    }

    private static class TestServer {
        private final Server server;

        private TestServer() throws Exception {
            server = new Server(9999);
            server.setHandler(new AbstractHandler() {
                @Override
                public void handle(String target, Request request, HttpServletRequest httpServletRequest,
                        HttpServletResponse httpServletResponse) throws IOException, ServletException {
                    PrintWriter writer = httpServletResponse.getWriter();
                    for (int i = 0; i < 1_000_000; i++) {
                        writer.println("id:" + i);
                        writer.println("data:" + i);
                        writer.println();
                    }

                    request.setHandled(true);
                }
            });
            server.start();
        }

        void shutdown() throws Exception {
            server.stop();
        }
    }
}

It starts a server that emits one million SSE (server-sent event) messages, and tries to read them all using Spring's reactive WebClient. The only non-trivial thing is that before consuming the messages, I inserted .publishOn(Schedulers.single()) as it is recommended for slow consumers (and in my real project I actually have a relatively slow consumer, namely KafkaSender, that already uses Schedulers.single() by default).

The corresponding snippet of the POM follows:

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-webflux</artifactId>
        <version>5.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
        <version>0.9.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.6.2</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.eclipse.jetty</groupId>
        <artifactId>jetty-server</artifactId>
        <version>9.4.31.v20200723</version>
        <scope>test</scope>
    </dependency>
</dependencies>

On my machine, this test fails immediately:

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests

    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Body from GET http://localhost:9999/ [DefaultClientResponse]
Stack trace:
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:304)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
        at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
        at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:300)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:405)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:649)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:249)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:588)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:316)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:303)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:417)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:271)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:834)
    Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Flux.blockLast(Flux.java:2482)
        at MainTest.test(MainTest.java:42)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
        at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
        at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
        at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
        at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
        at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
        at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
        at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
        at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
        at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
        at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
        at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
        at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
        at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
        at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
        at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
        at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
        at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
        at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
        at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
        at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
        at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:128)

If I remove the publishOn(...) line, it works fine.

On the one hand, I understand that a fast producer can overwhelm a slow/restricted consumer. On the other hand, I'm using a reactive flow, so I expect that the web client does not read anything if there is no demand.

Is it that I do something wrong here? Or it is an erroneous behavior (by the libraries)?

If I'm doing something wrong, how can this be fixed? .onBackpressureBuffer() just before the .publishOn(...) seems to allow the program run without errors, but is it a correct solution? What if it runs out of memory? Is there any way to just read slower?

The full project code is here: https://github.com/rpuch/webclient-sse-with-slow-consumer


Solution

  • When I change reactor-core version from 3.3.2 to 3.3.3, the test passes: it heats the processor for about 2 minutes (well, a million messages, you know), but it finishes successfully.

    So it looks like the issue was caused by a bug in reactor-core, probably this one https://github.com/reactor/reactor-core/issues/1937 (closed in 3.3.3).