Search code examples
javaspringasynchronousintegration-testingspring-rabbit

RabbitListener does not pick up every message sent with AsyncRabbitTemplate


I am using a Spring-Boot project on Spring-Boot Version 1.5.4, with spring-boot-starter-amqp, spring-boot-starter-web-services and spring-ws-support v. 2.4.0. So far, I have successfully created a @RabbitListener Component, which does exactly what it is supposed to do, when a message is sent to the broker via rabbitTemplate.sendAndReceive(uri, message). I tried to see what would happen if I used AsyncRabbitTemplate for this, as it is possible that the message processing might take a while, and I don't want to lock my application while waiting for a response.

The problem is: the first message I put in the queue is not even being picked up by the listener. The callback just acknowledges a success with the published message instead of the returned message.

Listener:

@RabbitListener(queues = KEY_MESSAGING_QUEUE)
public Message processMessage(@Payload byte[] payload, @Headers Map<String, Object> headers) {
    try {
        byte[] resultBody = messageProcessor.processMessage(payload, headers);
        MessageBuilder builder = MessageBuilder.withBody(resultBody);
        if (resultBody.length == 0) {
            builder.setHeader(HEADER_NAME_ERROR_MESSAGE, "Error occurred during processing.");
        }
        return builder.build();
    } catch (Exception ex) {
        return MessageBuilder.withBody(EMPTY_BODY)
                .setHeader(HEADER_NAME_ERROR_MESSAGE, ex.getMessage())
                .setHeader(HEADER_NAME_STACK_TRACE, ex.getStackTrace())
                .build();
    }
}

When I am executing my Tests, one test fails, and the second test succeeds. The class is annotated with @RunWith(SpringJUnit4ClassRunner.class) and @SpringBootTest(classes = { Application.class, Test.TestConfiguration.class }) and has a @ClassRule of BrokerRunning.isRunningWintEmptyQueues(QUEUE_NAME)

TestConfiguration (inner class):

public static class TestConfiguration {

    @Bean // referenced in the tests as art
    public AsyncRabbitTemplate asyncRabbitTemplate(ConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        return new AsyncRabbitTemplate(rabbitTemplate, container);
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }
}

Tests:

@Test
public void shouldListenAndReplyToQueue() throws Exception {
    doReturn(RESULT_BODY)
            .when(innerMock)
            .processMessage(any(byte[].class), anyMapOf(String.class, Object.class));
    Message msg = MessageBuilder
            .withBody(MESSAGE_BODY)
            .setHeader("header", "value")
            .setHeader("auth", "entication")
            .build();

    RabbitMessageFuture pendingReply = art.sendAndReceive(QUEUE_NAME, msg);
    pendingReply.addCallback(new ListenableFutureCallback<Message>() {

        @Override
        public void onSuccess(Message result) { }

        @Override
        public void onFailure(Throwable ex) {
            throw new RuntimeException(ex);
        }
    });

    while (!pendingReply.isDone()) {}
    result = pendingReply.get();
    // assertions omitted
}

Test 2:

@Test
public void shouldReturnExceptionToCaller() throws Exception {
    doThrow(new SSLSenderInstantiationException("I am a message", new Exception()))
            .when(innerMock)
            .processMessage(any(byte[].class), anyMapOf(String.class, Object.class));
    Message msg = MessageBuilder
            .withBody(MESSAGE_BODY)
            .setHeader("header", "value")
            .setHeader("auth", "entication")
            .build();

    RabbitMessageFuture pendingReply = art.sendAndReceive(QUEUE_NAME, msg);
    pendingReply.addCallback(/*same as above*/);

    while (!pendingReply.isDone()) {}
    result = pendingReply.get();
    //assertions omitted
}

When I run both tests together, the test that is executed first fails, while the second call succeeds. When I run both tests separately, both fail. When I add an @Before-Method, which uses the AsyncRabbitTemplate art to put any message into the queue, both tests MAY pass, or the second test MAY not pass, so in addition to being unexpected, the behaviour is inconsistent as well.

The interesting thing is, that the callback passed to the method reports a success before the listener is invoked, and reports the sent message as result.

The only class missing from this is the general configuration class, which is annotated with @EnableRabbit and has this content:

@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(10);
    return factory;
}

Other Things I have tried:

  • specifically create AsyncRabbitTemplate myself, start and stop it manually before and after every message process -> both tests succeeded
  • increase / decrease receive timeout -> no effect
  • remove and change the callback -> no effect
  • explicitly created the queue again with an injected RabbitAdmin -> no effect
  • extracted the callback to a constant -> tests didn't even start correctly
  • As stated above, I used RabbitTemplate directly, which worked exactly as intended

If anyone has any ideas what is missing, I'd be very happy to hear.


Solution

  • You can't use the same queue for requests and replies...

    @Bean // referenced in the tests as art
    public AsyncRabbitTemplate asyncRabbitTemplate(ConnectionFactory connectionFactory, RabbitTemplate rabbitTemplate) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames(QUEUE_NAME);
        return new AsyncRabbitTemplate(rabbitTemplate, container);
    }
    

    Will listen for replies on QUEUE_NAME, so...

    RabbitMessageFuture pendingReply = art.sendAndReceive(QUEUE_NAME, msg);
    

    ...simply sends a message to itself. It looks like you intended...

    RabbitMessageFuture pendingReply = art.sendAndReceive(KEY_MESSAGING_QUEUE, msg);