Search code examples
springspring-integrationreactor

Spring Integration Reactor configuration


I'm running an application that process tasks using spring integration.

I'd like to make it process multiple tasks concurrently but any attempt failed so far.

My configuration is:

ReactorConfiguration.java

@Configuration
@EnableAutoConfiguration
public class ReactorConfiguration {


    @Bean
    Environment reactorEnv() {
        return new Environment();
    }

    @Bean
    Reactor createReactor(Environment env) {
        return Reactors.reactor()
            .env(env)
            .dispatcher(Environment.THREAD_POOL)
            .get();
    }
}

TaskProcessor.java

@MessagingGateway(reactorEnvironment = "reactorEnv")
public interface TaskProcessor {
    @Gateway(requestChannel = "routeTaskByType", replyChannel = "")
    Promise<Result> processTask(Task task);
}

IntegrationConfiguration.java (simplified)

@Bean
public IntegrationFlow routeFlow() {
    return IntegrationFlows.from(MessageChannels.executor("routeTaskByType", Executors.newFixedThreadPool(10)))
        .handle(Task.class, (payload, headers) -> {
            logger.info("Task submitted!" + payload);
            payload.setRunning(true);
            //Try-catch
            Thread.sleep(999999);
            return payload;
        })
        .route(/*...*/)
        .get();
}

My testing code can be simplified like this:

Task task1 = new Task();
Task task2 = new Task();

Promise<Result> resultPromise1 = taskProcessor.processTask(task1).flush();
Promise<Result> resultPromise2 = taskProcessor.processTask(task2).flush();


while( !task1.isRunning() || !task2.isRunning() ){
    logger.info("Task2: {}, Task2: {}", task1, task2);
    Thread.sleep(1000);
}

logger.info("Yes! your tasks are running in parallel!");

But unfortunately, the last log line, will never get executed!

Any ideas?

Thanks a lot


Solution

  • Well, I've reproduced it just with simple Reactor test-case:

    @Test
    public void testParallelPromises() throws InterruptedException {
        Environment environment = new Environment();
        final AtomicBoolean first = new AtomicBoolean(true);
        for (int i = 0; i < 10; i++) {
            final Promise<String> promise = Promises.task(environment, () -> {
                        if (!first.getAndSet(false)) {
                            try {
                                Thread.sleep(1000);
                            }
                            catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        return "foo";
                    }
            );
            String result = promise.await(10, TimeUnit.SECONDS);
            System.out.println(result);
            assertNotNull(result);
        }
    }
    

    (It is with Reactor-2.0.6).

    The problem is because of:

    public static <T> Promise<T> task(Environment env, Supplier<T> supplier) {
        return task(env, env.getDefaultDispatcher(), supplier);
    }
    

    where DefaultDispatcher is RingBufferDispatcher extends SingleThreadDispatcher.

    Since the @MessagingGateway is based on the request/reply scenario, we are waiting for reply within that RingBufferDispatcher's Thread. Since you don't return reply there (Thread.sleep(999999);), we aren't able to accept the next event within RingBuffer.

    Your dispatcher(Environment.THREAD_POOL) doesn't help here because it doesn't affect the Environment. You should consider to use reactor.dispatchers.default = threadPoolExecutor property. Something like this file: https://github.com/reactor/reactor/blob/2.0.x/reactor-net/src/test/resources/META-INF/reactor/reactor-environment.properties#L46.

    And yes: upgrade, please, to the latest Reactor.