Search code examples
javaspringrabbitmqamqpspring-rabbit

Java RabbitMQ consumer.nextMessage always gets same message


We are using Java rabbitMq with spring boot in a distributed service architecture. One service gets an HTTP request and forwards it to an unkown queue for processing. At the same time it has to wait for a response on another queue before it can terminate the HTTP request. (It's a preview request that gets its work done by a renderer).

There can be more than one instance of ServiceA (the HTTP Interface) and ServiceB (the renderer) so with every preview message we also send a unique ID to be used as routing key.

I'm having trouble with the BlockingConsumer. Whenever I call consumer.nextMessage() I get the same message over and over again. This is doubly weird, as for one it should be ACKed and removed from the queue and for another the consumer shouldn't even bother with it as the unique ID we used is no longer bound to the queue. nextMessage even returns before the renderer service is done and has sent its done message back.

Here's the simplified setup:

general

All services use a global DirectExchange for all messages

@Bean
  public DirectExchange globalDirectExchange() {
    return new DirectExchange(EXCHANGE_NAME, false, true);
  }

ServiceA (handles the HTTP request):

 private Content requestPreviewByKey(RenderMessage renderMessage, String previewKey) {
    String renderDoneRoutingKey= UUID.randomUUID().toString();
    renderMessage.setPreviewDoneKey(renderDoneId);
    Binding binding = BindingBuilder.bind(previewDoneQueue).to(globalDirectExchange)
        .with(renderDoneRoutingKey);
    try {
      amqpAdmin.declareBinding(binding);
      rabbitProducer.sendPreviewRequestToKey(renderMessage, previewKey);
      return getContentBlocking();
    } catch (Exception e) {
      logErrorIfDebug(type, e);
      throw new ApiException(BaseErrorCode.COMMUNICATION_ERROR, "Could not render preview");
    } finally {
      amqpAdmin.removeBinding(binding);
    }
  }


  private Content getContentBlocking() {
    BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(rabbitMqConfig.connectionFactory(), new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<>(), AcknowledgeMode.AUTO, true, 1, PREVIEW_DONE_QUEUE);
    try {
      blockingQueueConsumer.start();
      Message message = blockingQueueConsumer.nextMessage(waitForPreviewMs);
      if (!StringUtils.isEmpty(message)) {
        String result = new String(message.getBody());
        return JsonUtils.stringToObject(result, Content.class);
      }    
      throw new ApiException("Could not render preview");
    } catch (Exception e) {
      logError(e);
      throw new ApiException("Could not render preview");
    } finally {
      blockingQueueConsumer.stop();
    }

}

Service B

I'll spare you most of the code. My log says everything is going well and as soon as its done the service sends the correct message to the UUID key that was sent with the initial render request.

public void sendPreviewDoneMessage(Content content, String previewDoneKey) {
    String message = JsonUtils.objectToString(content);
    rabbitTemplate.convertAndSend(globalDirectExchange, previewDoneKey, message);
}

The whole thing works... Once... The real issues seems to be the consumer setup. Why do I keep getting the same (first) message from the queue when I use nextMessage(). Doesn't creating and removing a Bindung ensure, that only messages bound to that routingKey are even received in that instance? And doesn't nextMessage() acknowledge the message and remove it from the queue?!

Thank's a lot for bearing with me and even more for any helpful answer!


Solution

  • BlockingQueueConsumer is not designed to be used directly; it is a component of the SimpleMessageListenerContainer, which will take care of acking the message after it has been consumed by a listener (the container calls commitIfNecessary).

    There may be other unexpected side effects of using this consumer directly.

    I strongly advise using the listener container to consume messages.

    If you just want to receive messages on demand, use a RabbitTemplate receive() or receiveAndConvert() method instead.