Search code examples
rabbitmqspring-rabbit

RabbitMQ request/response "RabbitTemplate is not configured as listener"


I'm testing request/response pattern with Spring-AMQP rabbitmq implementation and I can't make it work...

I've got configured following artifacts:

test_exchange with greeting queue. Routing key = greeting
reply_exchange with replies queue. Routing key = replies

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = 
            new CachingConnectionFactory("....IP of broker...");
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    return connectionFactory;
}

@Bean
public Queue greeting() {
    return new Queue("greeting");
}

@Bean
public Queue replies() {
    return new Queue("replies");
}

MessageListener receiver() {
    return new MessageListenerAdapter(new RabbitMqReceiver(), "onMessage");
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, Queue replies) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setExchange("test_exchange");
    template.setRoutingKey("greeting");     
    template.setReplyAddress("reply_exchange"+"/"+replies.getName());
    template.setReplyTimeout(60000);
    return template;
}

@Bean
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory, 
        RabbitTemplate rabbitTemplate, Queue replies) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setMessageListener(rabbitTemplate);
    container.setQueues(replies);
    return container;
}

@Bean
public SimpleMessageListenerContainer serviceListenerContainer(
        ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueues(greeting());
    container.setMessageListener(receiver());
    return container;
}

I was following example at github, but it crashes with:

Caused by: java.lang.IllegalStateException: RabbitTemplate is not configured as MessageListener - cannot use a 'replyAddress': reply_exchange/replies

Documentation says:

Starting with version 1.5, the RabbitTemplate will detect if it has been configured as a MessageListener to receive replies. If not, attempts to send and receive messages with a reply address will fail with an IllegalStateException (because the replies will never be received).

This is excellent, but how RabbitTemplate does that? How does it detect if it's configured as MessageListener?

thanks in advance

PS: Send code:

public void send() {
    Message message = MessageBuilder.withBody("Payload".getBytes())
            .setContentType("text/plain")
            .build();
    Message reply = this.template.sendAndReceive(message);
    System.out.println("Reply from server is: "+new String(reply.getBody()));
}

Solution

  • When the reply container starts, it detects that the template is ListenerContainerAware and calls expectedQueueNames() to retrieve the reply queues (or null if the replyAddress has the form exch/rk); if a non-null result is returned, the container checks that the queue is correct; if exch/rk is the reply address, you would get this

    logger.debug("Cannot verify reply queue because it has the form 'exchange/routingKey'");
    

    This method unconditionally sets the isListener boolean which avoids that exeption. So it seems like the container hasn't started before you sent your message - are you sending before the context is fully initialized?

    Note that since RabbitMQ implemented direct reply-to, it is generally not necessary any more to use a reply container (unless you want HA reply queues or need an explicit reply queue for some other reason). Direct reply-to removed the performance problem that drove us to implement the reply container mechanism.