Search code examples
javaspring-bootrabbitmqspring-amqpspring-rabbit

Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: No correlation header in reply


I'm trying to migrate the spring boot parent and I had the displeasure of starting to receive an error message when trying to consume the messages. The error is the title, and this occurred when trying to update the spring parent.

obs. Service queues are not request reply queues

Previous:

  • org.springframework.boot:spring-boot-starter-web:jar:2.4.3
  • org.springframework.boot:spring-boot-starter-amqp:jar:2.4.3

After with problem:

  • org.springframework.boot:spring-boot-starter-web:jar:2.7.5
  • org.springframework.boot:spring-boot-starter-amqp:jar:2.7.5

Error:

2022-12-26 11:28:12.175 [notificationsContainer-1] WARN  o.s.a.r.l.ConditionalRejectingErrorHandler - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1771) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1661) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1576) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1564) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1555) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1499) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:992) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:939) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1316) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1222) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: No correlation header in reply
    at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2663) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:273) ~[spring-rabbit-2.4.5.jar:2.4.5]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1657) ~[spring-rabbit-2.4.5.jar:2.4.5]
    ... 10 common frames omitted

Here's how I'm configuring it.

@Configuration
@RequiredArgsConstructor
public class MessageQueueConfiguration {
    private static final String BROKER_NOTIFICATIONS = "notifications";
    private static final String BROKER_USERS = "users";
    private static final String BROKER_ADMIN = "admin";
    private static final String BROKER_REPORTS = "reports";

    private final MessageQueueProperties messageQueueProperties;
    private final BrokerProperties brokerProperties;
    private final ObjectMapper objectMapper;

    @Bean
    public AmqpAdmin brokerUsersAmqpAdmin(@Qualifier("brokerUsersConnectionFactory") ConnectionFactory brokerUsersConnectionFactory) {
        return getAmqpAdmin(brokerUsersConnectionFactory);
    }

    @Bean
    public AmqpAdmin brokerAdminAmqpAdmin(@Qualifier("brokerAdminConnectionFactory") ConnectionFactory brokerAdminConnectionFactory) {
        return getAmqpAdmin(brokerAdminConnectionFactory);
    }

    @Bean
    public AmqpAdmin brokerNotificationsAmqpAdmin(@Qualifier("brokerNotificationsConnectionFactory") ConnectionFactory brokerNotificationsConnectionFactory) {
        return getAmqpAdmin(brokerNotificationsConnectionFactory);
    }

    @Bean
    public AmqpAdmin brokerReportsAmqpAdmin(@Qualifier("brokerReportsConnectionFactory") ConnectionFactory brokerReportsConnectionFactory) {
        return getAmqpAdmin(brokerReportsConnectionFactory);
    }

    @Bean
    public MessageListenerAdapter listenerMessageQueueAdapter(ChannelAwareMessageListener messageBrokerReceiver) {
        return new MessageListenerAdapter(messageBrokerReceiver);
    }

    @Bean
    public CachingConnectionFactory brokerNotificationsConnectionFactory() {
        return createCachingConnectionFactory(BROKER_NOTIFICATIONS);
    }

    @Bean
    public CachingConnectionFactory brokerUsersConnectionFactory() {
        return createCachingConnectionFactory(BROKER_USERS);
    }

    @Bean
    public CachingConnectionFactory brokerAdminConnectionFactory() {
        return createCachingConnectionFactory(BROKER_ADMIN);
    }

    @Bean
    public CachingConnectionFactory brokerReportsConnectionFactory() {
        return createCachingConnectionFactory(BROKER_REPORTS);
    }

    @Bean
    public SimpleMessageListenerContainer notificationsContainer(@Qualifier("brokerNotificationsConnectionFactory") ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        return createMessageListenerContainer(connectionFactory, messageListenerAdapter);
    }

    @Bean
    public SimpleMessageListenerContainer usersContainer(@Qualifier("brokerUsersConnectionFactory") ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        return createMessageListenerContainer(connectionFactory, messageListenerAdapter);
    }

    @Bean
    public SimpleMessageListenerContainer adminContainer(@Qualifier("brokerAdminConnectionFactory") ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        return createMessageListenerContainer(connectionFactory, messageListenerAdapter);
    }

    @Bean
    public SimpleMessageListenerContainer reportsContainer(@Qualifier("brokerReportsConnectionFactory") ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        return createMessageListenerContainer(connectionFactory, messageListenerAdapter);
    }

    private SimpleMessageListenerContainer createMessageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();

        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(
                messageQueueProperties.getQueue1(),
                messageQueueProperties.getQueue2(),
                messageQueueProperties.getQueue3(),
                messageQueueProperties.getQueue4(),
                messageQueueProperties.getQueue5(),
                messageQueueProperties.getQueue6(),
                messageQueueProperties.getQueue7()
        );
        container.setMessageListener(messageListenerAdapter);
        container.setMaxConcurrentConsumers(50);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMissingQueuesFatal(false);
        container.setFailedDeclarationRetryInterval(60000);

        return container;
    }

    private CachingConnectionFactory createCachingConnectionFactory(String brokerName) {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
                brokerProperties.getBrokers().get(brokerName).getHost(),
                brokerProperties.getBrokers().get(brokerName).getPort());
        cachingConnectionFactory.setUsername(brokerProperties.getBrokers().get(brokerName).getUsername());
        cachingConnectionFactory.setPassword(brokerProperties.getBrokers().get(brokerName).getPassword());
        cachingConnectionFactory.setConnectionTimeout(brokerProperties.getBrokers().get(brokerName).getTimeout());
        return cachingConnectionFactory;
    }

    private Queue queue1() {
        Map<String, Object> queueArguments = new HashMap<>();
        return new Queue(messageQueueProperties.getEmailQueueName(), true, false, false, queueArguments);
    }

    private Queue queue2() {
        Map<String, Object> queueArguments = new HashMap<>();
        return new Queue(messageQueueProperties.getIosPushQueueName(), true, false, false, queueArguments);
    }

    private Queue queue3() {
        Map<String, Object> queueArguments = new HashMap<>();
        return new Queue(messageQueueProperties.getContentCardQueueName(), true, false, false, queueArguments);
    }

    private Queue queue4() {
        Map<String, Object> queueArguments = new HashMap<>();
        return new Queue(messageQueueProperties.getCanvasQueueName(), true, false, false, queueArguments);
    }

    private Queue queue5() {
        Map<String, Object> queueArguments = new HashMap<>();
        return new Queue(messageQueueProperties.getCampaignQueueName(), true, false, false, queueArguments);
    }

    private Queue queue6() {
        Map<String, Object> queueArguments = new HashMap<>();
        return new Queue(messageQueueProperties.getPushBrazeQueueName(), true, false, false, queueArguments);
    }

    private Queue queue7() {
        Map<String, Object> queueArguments = new HashMap<>();
        return new Queue(messageQueueProperties.getEmailBouncedStatusName(), true, false, false, queueArguments);
    }

    @Bean
    public FanoutExchange exchange() {
        return new FanoutExchange(messageQueueProperties.getPushNotificationExchangeName());
    }

    @Bean
    public Binding binding(Queue queue, FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }


    private AmqpAdmin getAmqpAdmin(ConnectionFactory connectionFactory){
        AmqpAdmin amqpAdmin = new RabbitAdmin(connectionFactory);
        amqpAdmin.declareQueue(queue1());
        amqpAdmin.declareQueue(queue2());
        amqpAdmin.declareQueue(queue4());
        amqpAdmin.declareQueue(queue3());
        amqpAdmin.declareQueue(queue5());
        amqpAdmin.declareQueue(queue6());
        amqpAdmin.declareQueue(queue7());

        amqpAdmin.declareExchange(exchange());
        amqpAdmin.declareBinding(binding(queue6(), exchange()));

        return amqpAdmin;
    }

    @Bean
    public Jackson2JsonMessageConverter jackson2JsonMessageConverter(ObjectMapper objectMapper) {
        objectMapper.setDefaultPropertyInclusion(
                JsonInclude.Value.construct(JsonInclude.Include.ALWAYS, JsonInclude.Include.NON_NULL));
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    private RabbitTemplate buildTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(createCachingConnectionFactory(BROKER_NOTIFICATIONS));
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter(objectMapper));
        return rabbitTemplate;
    }

    @Bean
    public RabbitTemplate eventsTemplate() {
        RabbitTemplate rabbitTemplate = buildTemplate();
        rabbitTemplate.setRoutingKey(messageQueueProperties.getCampaignQueueName());
        return buildTemplate();
    }

    @Bean
    @Primary
    public RabbitTemplate defaultRabbitTemplate() {
        return buildTemplate();
    }
}

Solution

  • The issue was due to the @Primary template being injected as the listener (because it implements ChannelAwareMessageListener).

    The solution was to narrow the type of the dependency, from

    @Bean
    public MessageListenerAdapter listenerMessageQueueAdapter(ChannelAwareMessageListener messageBrokerReceiver) {
        return new MessageListenerAdapter(messageBrokerReceiver);
    }
    
    @Bean
    public MessageListenerAdapter listenerMessageQueueAdapter(MessageBrokerReceiver messageBrokerReceiver) {
        return new MessageListenerAdapter(messageBrokerReceiver);
    }
    

    Furthermore, the adapter is unnecessary because messageBrokerReceiver implements ChannelAwareMessageListener so no adaptation is needed. The adapter is intended to invoke a POJO listener.