Search code examples
javaspringrabbitmqspring-integrationamqp

Spring Integration and RabbitMQ: no output-channel or replyChannel header available while receiving message


I wrote a simple message flow with request and reply. I have to use two independent queues so i declare AmqpOutboundAdapter to send a message and AmqpInboundAdapter to receive a reply.

@Bean
@FindADUsers
public AmqpOutboundEndpoint newFindADUsersOutboundAdapter() {
    return Amqp.outboundAdapter(amqpTemplate())
            .routingKeyExpression("headers[" + ADUsersFindConfig.ROUTING_KEY_HEADER + "]")
            .exchangeName(getExchange())
            .headerMapper(amqpHeaderMapper())
            .get();
}

@Bean
public AmqpInboundChannelAdapter newFindADUsersResponseInboundChannelAdapter(
        ADUsersFindResponseConfig config) {
    return Amqp.inboundAdapter(rabbitConnectionFactory(), findADUsersResponseQueue)
            .headerMapper(amqpHeaderMapper())
            .outputChannel(config.newADUsersFindResponseOutputChannel())
            .get();
}

It should work with @MessagingGateway:

@MessagingGateway
public interface ADUsersFindService {

     String FIND_AD_USERS_CHANNEL = "adUsersFindChannel";

     String FIND_AD_USERS_REPLY_OUTPUT_CHANNEL = "adUsersFindReplyOutputChannel";

     String FIND_AD_USERS_REPLY_CHANNEL = "adUsersFindReplyChannel";

     String CORRELATION_ID_REQUEST_HEADER = "correlation_id";

     String ROUTING_KEY_HEADER = "replyRoutingKey";

     String OBJECT_TYPE_HEADER = "object.type";

     @Gateway(requestChannel = FIND_AD_USERS_CHANNEL, replyChannel = FIND_AD_USERS_REPLY_CHANNEL)
ADResponse find(ADRequest adRequest, @Header(ROUTING_KEY_HEADER) String routingKey, @Header(OBJECT_TYPE_HEADER) String objectType);
}

And the ADUsersFindResponseConfig class looks like:

 @Configuration
 @Import(JsonConfig.class)
 public class ADUsersFindResponseConfig {

     @Autowired
     public NullChannel nullChannel;

     @Autowired
     private JsonObjectMapper<?, ?> mapper;

     /**
      * @return The output channel for the flow
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_OUTPUT_CHANNEL)
     public MessageChannel newADUsersFindResponseOutputChannel() {
         return MessageChannels.direct().get();
     }

     /**
      * @return The output channel for gateway
      */
     @Bean(name = ADUsersFindService.FIND_AD_USERS_REPLY_CHANNEL)
     public MessageChannel newADUsersFindResponseChannel() {
         return MessageChannels.direct().get();
     }

     @Bean
     public IntegrationFlow findADUsersResponseFlow() {
         return IntegrationFlows
                 .from(newADUsersFindResponseOutputChannel())
                 .transform(new JsonToObjectTransformer(ADResponse.class, mapper))
                 .channel(newADUsersFindResponseChannel())
                 .get();
     }
 }

Sending message works properly, but i have a problem with receiving message. I am expecting that received message will be passed to channel called FIND_AD_USERS_REPLY_OUTPUT_CHANNEL, then the message will be deserialized to ADResponse object using findADUsersResponseFlow , and next ADResponse object will be passed to gateway replyChannel - FIND_AD_USERS_REPLY_CHANNEL. Finally, 'find' method return this object. Unfortunately when org.springframework.integration.handler.BridgeHandler receive a message, i got exception:

 org.springframework.messaging.MessagingException: ; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available

Message log looks like:

 11:51:35.697 [SimpleAsyncTaskExecutor-1] INFO  New message - GenericMessage [payload={...somepayload...}, headers={correlation_id=7cbd958e-4b09-4e4c-ba8e-5ba574f3309a, replyRoutingKey=findADUsersResponse.ad, amqp_consumerQueue=findADUsersResponseQueue, history=newFindADUsersResponseInboundChannelAdapter,adUsersFindReplyOutputChannel,adUsersFindReplyChannel,infoLog,infoLoggerChain.channel#0,infoLoggerChain.channel#1, id=37a4735d-6983-d1ad-e0a1-b37dc17e48ef, amqp_consumerTag=amq.ctag-8Qs5YEun1jXYRf85Hu1URA, object.type=USER, timestamp=1469094695697}]

So i'm pretty sure that message was passed to adUsersFindReplyChannel. Also (if it's important) both request message and reply message have 'replyTo' header set to null. What am I doing wrong?


Solution

  • The replyChannel header is a live object and can't be serialized over AMQP.

    You can use an outbound gateway instead of the pair of adapters and the framework will take care of the headers.

    If you must use adapters for some reason, you need to do 2 things:

    1. Use the header channel registry to convert the channel object to a String which is registered with the registry.

    2. Make sure that the header mapper is configured to send/receive the replyChannel header and that your receiving system returns the header in the reply.