Search code examples
springspring-integration-dsl

Spring Integration DSL IntegrationFlow filter() does not return anything and waiting infinitely to return


import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.activemq.ActiveMQAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.PollerSpec;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.RequestHandlerRetryAdvice;
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.messaging.Message;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
import org.springframework.retry.support.RetryTemplate;


@SpringBootConfiguration
@EnableAutoConfiguration(exclude = {DataSourceAutoConfiguration.class, ActiveMQAutoConfiguration.class})
public class SpringIntegrationTestApplication {

    public static void main2(String[] args) {
        RetryTemplate template = new RetryTemplate();
        RetryPolicy policy = new MaxAttemptsRetryPolicy(200);
        template.setRetryPolicy(policy);
        template.execute(context -> {
            System.out.println("Trying");
            throw new RuntimeException("problem");
        });
    }

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext ctx = SpringApplication.run(SpringIntegrationTestApplication.class, args);

        Cafe cafe = ctx.getBean(Cafe.class);

        Order entirelyGoodOrder = new Order(1);
        entirelyGoodOrder.addItem(DrinkType.LATTE, 3, true);
        entirelyGoodOrder.addItem(DrinkType.ESPRESSO, 3, true);
        entirelyGoodOrder.addItem(DrinkType.MOCHA, 3, true);

        cafe.placeOrder(entirelyGoodOrder);

        System.out.println("Hit 'Enter' to terminate");
        System.in.read();
        ctx.close();
    }

    @MessagingGateway
    public interface Cafe {

        @Gateway(requestChannel = "orders.input")
        void placeOrder(Order order);
    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerSpec poller() {
        return Pollers.fixedDelay(1000);
    }

    @Bean
    public IntegrationFlow orders(IntegrationFlow businessLogic, IntegrationFlow errorHandler123, IntegrationFlow publishingGateway) {
        return f -> f
                .log(LoggingHandler.Level.INFO)
                .split(Order.class, Order::getItems)
                .gateway(businessLogic, gatewayEndpointSpec -> {
                    gatewayEndpointSpec.advice(retry());
                })
                .route(Message.class, message -> message.getPayload() instanceof ExceptionalMessage,
                        router -> router
                                .subFlowMapping(true, errorHandler123)
                                .channelMapping(false, "nullChannel"));
    }

    public static RequestHandlerRetryAdvice retry() {
        RetryTemplate template = new RetryTemplate();
        RetryPolicy policy = new MaxAttemptsRetryPolicy(3);
        template.setRetryPolicy(policy);
        RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
        requestHandlerRetryAdvice.setRetryTemplate(template);
        requestHandlerRetryAdvice.setRecoveryCallback(context -> new ExceptionalMessage(context.getLastThrowable()));
        return requestHandlerRetryAdvice;
    }

    @Bean
    public IntegrationFlow businessLogic() {
        return f -> f

                .handle((payload, headers) -> {
                    OrderItem orderItem = (OrderItem) payload;
                    if (orderItem.getDrinkType() == DrinkType.LATTE) {
                        String message = String.format("throwing exception from first channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);

                        throw new RuntimeException("Broken order first channel");
                    } else {
                        String message = String.format("Processed inside first channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);
                        return orderItem;
                    }
                })
                .channel("publishingChannel");
    }

    @Bean
    public IntegrationFlow publishingGateway() {
        return IntegrationFlows.from("publishingChannel")
                .filter(source -> source.equals(1),  spec -> {
                    spec.requiresReply(true);

                })
                .handle((payload, header) -> {
                    System.out.println("publishingGateway CALLED");

                    OrderItem orderItem = (OrderItem) payload;
                    if (orderItem.getDrinkType() == DrinkType.ESPRESSO) {
                        String message = String.format("throwing exception from publishingGateway channel logic orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);

                        throw new RuntimeException("Broken order publishingGateway channel");
                    } else {
                        String message = String.format("Processed inside publishingGateway channel. orderId=%s, drinkType=%s", orderItem.getOrder().getNumber(), orderItem.getDrinkType());
                        System.out.println(message);
                        return orderItem;
                    }
                })
                .get();
    }

    @Bean
    public IntegrationFlow errorHandler123() {
        return f -> f
                .log(LoggingHandler.Level.ERROR, message -> String.format("Message being dropped. %s", message.getPayload()));
    }
}

Inside publishing gateway after adding a filter without requiresReply(true), pipeline started waiting infinitely.

And if I remove filter it works as expected.

Tried adding spec.requiresReply(true); but it throws error ReplyRequiredException: No reply produced by handler

Message being dropped. ExceptionalMessage[exceptionMessage=org.springframework.messaging.MessagingException: Failed to handle; nested exception is org.springframework.integration.handler.ReplyRequiredException: No reply produced by handler 'publishingGateway.org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and its 'requiresReply' property is set to true.,


Solution

  • By default, the gateway will wait indefinitely for a reply; there will be no reply if you filter out the request.

    Set the replyTimeout property on the gateway; as long as you use the default (direct) channels (so everything runs on the calling thread), you can safely set the timeout to 0.