Search code examples
spring-integrationspring-jmsspring-dsl

Spring Integration 4 asynchronous request/response


I am trying to write a simple message flow using Spring Integration v4's DSL APIs which would look like this:

       -> in.ch -> Processing -> JmsGatewayOut -> JMS_OUT_QUEUE
Gateway
       <- out.ch <- Processing <- JmsGatewayIn <- JMS_IN_QUEUE

With the request/response being asynchronous, when I inject a message via the initial Gateway, the message goes all the way to JMS_OUT_QUEUE. Beyond this message flow, a reply message is put back into JMS_IN_QUEUE which it is then picked up by JmsGatewayIn. At this point, the message is Processed and placed into out.ch (I know the response gets to out.ch because I have a logger interceptor there which logs the message being placed there) but, the Gateway never receives the response.

Instead of a response, the system outside of this message flow which picked up the message from JMS_OUT_QUEUE and placed the response in JMS_IN_QUEUE, receives a javax.jms.MessageFormatException: MQJMS1061: Unable to deserialize object on its own JmsOutboundgateway (I think it is failing to deserialize a jms reply object from looking at the logs).

I have clearly not got something configured correctly but I don't know exactly what. Does anyone know what I am missing?

Working with spring-integration-core-4.0.3.RELEASE, spring-integration-jms-4.0.3.RELEASE, spring-integration-java-dsl-1.0.0.M2, spring-jms-4.0.6.RELEASE.

My Gateway is configured as follows:

@MessagingGateway
public interface WsGateway {

    @Gateway(requestChannel = "in.ch", replyChannel = "out.ch", 
        replyTimeout = 45000)
    AResponse process(ARequest request);
}

My Integration flow is configured as follows:

@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class IntegrationConfig {

    @Bean(name = "in.ch")
    public DirectChannel inCh() {
        return new DirectChannel();
    }

    @Bean(name = "out.ch")
    public DirectChannel outCh() {
        return new DirectChannel();
    }   

    @Autowired
    private MQQueueConnectionFactory mqConnectionFactory;

    @Bean
    public IntegrationFlow requestFlow() {

        return IntegrationFlows.from("in.ch")
                .handle("processor", "processARequest")
                .handle(Jms.outboundGateway(mqConnectionFactory)
                        .requestDestination("JMS_OUT_QUEUE")
                        .correlationKey("JMSCorrelationID")
                .get();
    }

    @Bean
    public IntegrationFlow responseFlow() {

        return IntegrationFlows.from(Jms.inboundGateway(mqConnectionFactory)
                .destination("JMS_IN_QUEUE"))
                .handle("processor", "processAResponse")
                .channel("out.ch")
                .get();
    }
}

Thanks for any help on this, PM.


Solution

  • First of all your configuration is bad:

    1. Since you start the flow from WsGateway#process you really should wait reply there. The gateway's request/reply capability is based on TemporaryReplyChannel, which is placed to the headers as non-serializable value.

    2. As long as you wait rely on that gateway, actually there is no reason to provide the replyChannel, if you aren't going to do some publish-subscribe logic on the reply.

    3. As you send message to the JMS queue, you should understand that consumer part might be a separete remote application. And the last one might know nothing about your out.ch.

    4. The JMS request/reply capability is really based on JMSCorrelationID, but it isn't enough. The one more thing here is a ReplyTo JMS header. Hence, if you are going to send reply from the consumer you should really just rely on the JmsGatewayIn stuff.

    So I'd change your code to this:

    @MessagingGateway
    public interface WsGateway {
    
        @Gateway(requestChannel = "in.ch", replyTimeout = 45000)
        AResponse process(ARequest request);
    }
    
    @Configuration
    @EnableIntegration
    @IntegrationComponentScan
    @ComponentScan
    public class IntegrationConfig {
    
        @Bean(name = "in.ch")
        public DirectChannel inCh() {
            return new DirectChannel();
        }
    
        @Autowired
        private MQQueueConnectionFactory mqConnectionFactory;
    
        @Bean
        public IntegrationFlow requestFlow() {
            return IntegrationFlows.from("in.ch")
                    .handle("processor", "processARequest")
                    .handle(Jms.outboundGateway(mqConnectionFactory)
                            .requestDestination("JMS_OUT_QUEUE")
                            .replyDestination("JMS_IN_QUEUE"))
                    .handle("processor", "processAResponse")
                    .get();
        }
    
    }
    

    Let me know, if it is appropriate for you or try to explian why you use two-way gateways for one one-way cases. Maybe Jms.outboundAdapter() and Jms.inboundAdapter() are more good for you?

    UPDATE

    How to use <header-channels-to-string> from Java DSL:

    .enrichHeaders(e -> e.headerChannelsToString())