Search code examples
spring-rabbitintegration-testing

Integration Testing RabbitMQ listner - Intermittently failing because the message takes time to be queued


I have written an integration test for the following flow using RabbitMock (found it on github and it seems really cool):

Message gets added to Incoming-message-queue --> incoming message listner picks up the message --> processes it --> puts a new outgoing message to the new queue Outgoing-message-queue --> (only for tests) wrote a listner for this outgoing queue in src/test/resources.

Everything is working (with one important glitch - intermittent timeout) and I am doing asserts as shown below:

List<OutgoingData> receivedMessages = new ArrayList<>();
            assertTimeoutPreemptively(ofMillis(15000L), () -> {
                    while (receivedMessages.isEmpty()) {
                        OutgoingData data = 
receiver.getOutgoingData();
                        if(data != null){
                            receivedMessages.add(data);
                        }

                    }
                }
            );

            assertThat(receivedMessages.get(0)).isNotNull();

 assertThat
(receivedMessages.get(0).getRecipient())
.isEqualTo("enabled@localhost");

The timeout in this test is real issue I am facing.

  1. Because of the timeout, the tests are getting slow.
  2. If I remove the timeout, tests are getting stuck in Jenkins and need to be killed forcefully.
  3. At times, this 15000 milliseconds timeout is also not enough and the tests fail.

I was wondering if there is a better way to deal with such a situation in the integration test.

Looking forward to your inputs.

Thanks a lot, Banyanbat


Solution

  • When I gave it a little more thought and talked to one of my team members about this, it clicked my mind that java 8 futures can be effectively used here.

    I implemented it as follows and it worked like a charm.

    @Test
    public void basic_consume_case()
            InterruptedException, ExecutionException {
        IncomingData incomingData = new IncomingData();
        incomingData.setRecipient("enabled@localhost");
        incomingData.setSender("unblocked@localhost");
        incomingData.setStoreKey("123");
        incomingData.setSubject("Hello");
        incomingData.setText("Hello from Test 1");
        try (AnnotationConfigApplicationContext context = new 
       AnnotationConfigApplicationContext(
                ConnectionFactoryConfiguration.class)) {
    
            sendMessage(incomingData);
    
            Future<OutgoingData> receivedMessageFuture = pollOutgoingQueueForData();
    
            OutgoingData receivedMessage = receivedMessageFuture.get();
    
            assertThat(receivedMessage).isNotNull();
            assertThat(receivedMessage.getRecipient()).isEqualTo("enabled@localhost");
            assertThat(receivedMessage.getContent())
            ...
    
        }
    }
    private void sendMessage(IncomingData incomingData) {
        try {
            rabbitTemplate.convertAndSend("incoming-data-queue", incomingData, m -> {
                m.getMessageProperties().setContentType("application/json");
                return m;
            });
    
        } finally {
        }
    }
    
    private Future<OutgoingData> pollOutgoingQueueForData() throws InterruptedException {
    
        return executor.submit(() -> {
            OutgoingData receivedMessage = null;
            while (receivedMessage == null) {
                receivedMessage = (OutgoingData) 
    rabbitTemplate.receiveAndConvert("outgoing-queue");
            }
            return receivedMessage;
        });
    
    }