Search code examples
springspring-integrationspring-integration-dsl

Spring Integration Flows: Looking for strategies for avoiding java.lang.StackOverflowError due to recursive flows


Using Spring-Integration version 5.5.18, I have a flow which works well for small sets of inputs but which encounters java.lang.StackOverflowError for larger sets due to recursive calls through a channel and ending the recursion using a filter. Are there any recommendations for ways of modifying the flow to be less apt to generate java.lang.StackOverflowError? (I do know about requesting a larger thread stack for the JVM.) Is there perhaps a way of producing this iteratively with a DSL declaration, rather than recursively? Or perhaps this requires a MessageHandler which performs the iteration in Java outside of the Spring-Integration Java-DSL?

An example flow that exhibits the problem could be:

package org.example.filter;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.*;
import org.springframework.messaging.Message;

import java.util.function.Consumer;

@Configuration
public class OverflowFlows {

    private static final String CURRENT_ITERATION = "currentIteration";
    private static final String ACCUMULATOR       = "accumulator";
    private static final String RECURSION_CHANNEL = "recurseToBodyValue";

    @Bean
    public IntegrationFlow callRecursiveFlow() {
        return IntegrationFlows
                .from("createsOverflow")
                .enrichHeaders(h -> h.headerFunction(CURRENT_ITERATION, m -> 0))
                .enrichHeaders(h -> h.headerFunction(ACCUMULATOR, m -> 0))
                .gateway(callRecurseToBodyValueFlow)
                .transform("headers.accumulator")
                .get();
    }

    private final IntegrationFlow callRecurseToBodyValueFlow =
            flow -> flow.channel(RECURSION_CHANNEL);

    @Bean IntegrationFlow recurseToBodyValueFlow() {
        return IntegrationFlows
                .from(RECURSION_CHANNEL)
                .enrichHeaders(h -> h.headerExpression(CURRENT_ITERATION,
                                                     "headers.currentIteration + 1",
                                                     true))
                .filter("headers.currentIteration < payload", DISCARD_RETURNS_CURRENT_MESSAGE)
                .enrichHeaders(h -> h.headerExpression(ACCUMULATOR,
                                                       "headers.accumulator + headers.currentIteration",
                                                       true))
                .channel(RECURSION_CHANNEL)
                .get();
    }

    public static final Consumer<FilterEndpointSpec> DISCARD_RETURNS_CURRENT_MESSAGE = discardReturnsCurrentMessage();

    public static Consumer<FilterEndpointSpec> discardReturnsCurrentMessage() {
        return filterSpec -> filterSpec.discardFlow(IntegrationFlowDefinition::bridge);
    }
}

Used in conjunction with the declarations for the MessageChannel and the MessagingGateway:

package org.example.filter;

import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;

@Component
public class OverflowMessageChannels {

    @Bean(name = "createsOverflow")
    public MessageChannel createsOverflow() {
        return MessageChannels.direct().get();
    }

}
package org.example.filter;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway(name = "overflowGateway", defaultReplyTimeout = "20")
public interface OverflowGateway {
    @Gateway(requestChannel = "createsOverflow")
    Integer createsOverflow(Integer value);
}

The 1000 iteration unit test will cause a stack overflow.

Any pointers on strategies will be appreciated.

import org.example.filter.OverflowGateway;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.stream.IntStream;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = org.example.filter.ApiConfiguration.class)
public class OverflowTest {

    @Autowired
    OverflowGateway overflowGateway;

    @Test
    public void call_with_one_returns_one() {
        assertThat(overflowGateway.createsOverflow(1), equalTo(sumOfRange(1)));
    }

    @Test
    public void call_with_20_returns_20() {
        assertThat(overflowGateway.createsOverflow(20), equalTo(sumOfRange(20)));
    }

    @Test
    public void call_with_1000_returns_1000() {
        // causes java.lang.StackOverflowError
        assertThat(overflowGateway.createsOverflow(1000), equalTo(sumOfRange(1000)));
    }

    private static int sumOfRange(final int topOfRange) {
        return IntStream.range(1, topOfRange).sum();
    }

}

Solution

  • I think this has nothing to do with Spring Integration. You simply can get StackOverflowError with plain Java: What is the maximum depth of the java call stack?. Yes, Spring Integration adds some overhead with its messaging nature via channels and endpoints between them, but logically it doesn't matter: if your logic is recursive in the same thread, you eventually end up with the StackOverflowError when add more steps into that loop.

    You can break it, though, making that RECURSION_CHANNEL as an ExecutorChannel instance. This way you push every iteration of that loop into separate thread and call stack becomes not recursive.