Search code examples
springspring-integrationreactive-programmingproject-reactorenterprise-integration

Spring integration backpressure error on splitting large stream


The goal is to stream a large json.gz file (4 GB compressed, around 12 GB uncompressed, 12 million rows) from a web server to the database directly without downloading locally. Since Spring integration outbound gateway doesn't support gzip format, I'm doing it myself using okhttp that automatically decompresses the response:

body = response.body().byteStream(); // thanks okhttp
reader = new InputStreamReader(body, StandardCharsets.UTF_8);
br = new BufferedReader(reader, bufferSize);

Flux<String> flux = Flux.fromStream(br.lines())
    .onBackpressureBuffer(10000, x -> log.error("Buffer overrun!"))
    .doAfterTerminate(() -> closeQuietly(closeables))
    .doOnError(t -> log.error(...))

In the integration flow:

.handle(new MessageTransformingHandler(new GzipToFluxTransformer(...)))
.split()
.log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
.channel(repositoryInputChannel())

But

2017-12-08 22:48:47.846 [task-scheduler-7] [ERROR] c.n.d.y.s.GzipToFluxTransformer - Buffer overrun!
2017-12-08 22:48:48.337 [task-scheduler-7] [ERROR] o.s.i.h.LoggingHandler - org.springframework.messaging.MessageHandlingException: 
error occurred in message handler [org.springframework.integration.splitter.DefaultMessageSplitter#1]; 
nested exception is reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...), 
failedMessage=...}]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:153)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)

The output channel is hooked up at runtime using an unbounded queue polled by a bridge. This is to facilitate testing such that the queue can be replaced by a DirectChannel for testing.

@Bean(name = "${...}")
public PollableChannel streamingOutputChannel() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow srcToSinkBridge() {
    return IntegrationFlows.from(streamingOutputChannel())
        .bridge(e -> e.poller(Pollers.fixedDelay(500)))
        .channel(repositoryInputChannel())
        .get();
}

I've couple of doubts here.

  1. I'm not sure that the dynamic binding using SPEL in the bean name is working, but I don't know how to verify it.
  2. Since the queue is unbounded, all I can think of is that the polling is not quick enough. However, the exception suggests that the splitter is having a problem keeping up.

Solution

  • The problem is the log statement! It uses wiretap to change the output channel of the splitter to a DirectChannel which messes up the logic for AbstractMessageSplitter.

    boolean reactive = getOutputChannel() instanceof ReactiveStreamsSubscribableChannel;
    

    Quoting the doc:

    Starting with version 5.0, ... if Splitter’s output channel is an instance of a ReactiveStreamsSubscribableChannel, the AbstractMessageSplitter produces a Flux result instead of an Iterator and the output channel is subscribed to this Flux for back-pressure based splitting on downstream flow demand.

    The working code is as below - simply moving the log statement from immediately after the splitter to the end fixed the backpressure issue:

    IntegrationFlows.from(inputChannel)
    .filter(Message.class, msg -> msg.getHeaders().containsKey(FILE_TYPE_HEADER))
    .handle(new GzipToFluxTransformer(...))
    .transform((Flux<String> payload) -> payload
            .onBackpressureBuffer(getOnBackpressureBufferSize(),
                    s -> log.error("Buffer overrun!")))
    .split()
    .channel(c -> c.flux(outputChannel))
    .log(LoggingHandler.Level.DEBUG, CLASS_NAME, Message::getHeaders)
    .get();
    

    I've opened issue 2302 on spring integration GitHub.