Search code examples
spring-integration

Spring Integration 6.1 breaking change: threads no longer block indefinitely


We're having some issues with unwanted parallel execution of threads that used to execute sequentially. I narrowed it down to this breaking change in v6.1

https://github.com/spring-projects/spring-integration/wiki/Spring-Integration-6.0-to-6.1-Migration-Guide#do-not-block-by-default

I'm unsure how I get our code to work with v6.1. It's breaks multiple applications currently in use in production.

Here's an example code to explain the behavior before and after upgrading to 6.1:

            .routeToRecipients(route -> route
                    .recipientFlow(flow -> flow
                            .log(INFO, logCat, m -> "first flow started")
                            .scatterGather(
                                    scatterer -> scatterer
                                            .applySequence(true)
                                            .recipientFlow(innerFlow -> innerFlow
                                                    .channel(c -> c.executor(executorService))
                                                    .log(INFO, logCat, m -> "subflow started")
                                                    .transform(source -> {
                                                        try {
                                                            Thread.sleep(1000 * 40);
                                                        } catch (InterruptedException e) {
                                                            throw new RuntimeException(e);
                                                        }
                                                        return source;
                                                    })
                                                    .log(INFO, logCat, m -> "subflow ended")
                                                    .bridge()),
                                    gatherer -> gatherer.releaseStrategy(group -> {
                                        log.info("group size {}", group.size());
                                        return group.size() == 1;
                                    }))
                            .log(INFO, logCat, m -> "first flow ended")
                            .nullChannel()
                    )
                    .recipientFlow(flow -> flow
                            .log(INFO, logCat, m -> "second flow started")
                            .nullChannel()
                    )
            )

With Spring Integration 6.0:

13:44:02.357 INFO  [thread-0] ...: first flow started
13:44:02.360 INFO  [thread-0] ...: subflow started
13:44:42.366 INFO  [thread-5] ...: subflow ended
13:44:42.370 INFO  [thread-5] ...: group size 1
13:44:42.371 INFO  [thread-0] ...: first flow ended // good
13:44:42.372 INFO  [thread-0] ...: second flow started // good

With Spring Integration 6.1 (notice also that "first flow ended" is never logged):

12:21:32.389 INFO  [thread-0] ...: first flow started
12:21:32.391 INFO  [thread-0] ...: subflow started
12:22:02.399 INFO  [thread-0] ...: second flow started // bad, starts exactly 30 seconds after the first flow is blocked
12:22:12.399 INFO  [thread-5] ...: subflow ended
12:22:12.402 INFO  [thread-5] ...: group size 1

How can I:

  1. make sure "first flow ended" is always logged ?
  2. make sure "second flow started" waits for the first flow to end ?

Solution

  • The behavior you demonstrates is correct and expected according the change you are mentioning. As I said in that Migration Guide, the default timeout now is 30 seconds. And we simply can see that from the ScatterGatherHandler code:

    private long gatherTimeout = IntegrationContextUtils.DEFAULT_TIMEOUT;
    

    Where that one is this:

    /**
     * The default timeout for blocking operations like send and receive messages.
     * @since 6.1
     */
    public static final long DEFAULT_TIMEOUT = 30000L;
    

    Which is, essentially, smaller than your Thread.sleep(1000 * 40);. Therefore your fist sub-flow finishes silently without any further actions. Just because there is no reply to produce.

    To fix your problem and bring it back to the previous version behavior, you need to provide that timeout bigger, than your blocking sleep:

    scatterGatherSpec -> scatterGatherSpec.gatherTimeout(41000)
    

    as a third argument of that scatterGather() configuration.

    You also can set .requiresReply(true) to fail if no reply within that timeout.