Search code examples
rabbitmqspring-amqpspring-rabbit

How to use ConnectionListner and/or ChannelListner for logging failure/success of message delivery in RabbitMQ


I am trying to log any information or exception that occurs during message sending in RabbitMQ, for that I tried to add ConnectionListener on the existing connection factory.

    kRabbitTemplate.getConnectionFactory().addConnectionListener(new ConnectionListener() {

        @Override
        public void onCreate(Connection connection) {
            System.out.println("Connection Created");
        }

        @Override
        public void onShutDown(ShutdownSignalException signal) {
            System.out.println("Connection Shutdown "+signal.getMessage());
        }

    });
    kRabbitTemplate.convertAndSend(exchange, routingkey, empDTO);       
    

To test the exception scenario, I unbind and even deleted the queue from RabbitMQ console. But I did not get any exception or any shutdown method call.

Although, When I stopped RabbitMQ service, I got

Exception in thread "Thread-5" org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect

But this exception is not from the listener I added.

I want to know

  1. Why I did not get any exception or call from shutdown method
  2. How can I use ConnectionListner and/or ChannelListner for logging failure/success of message delivery.
  3. Can we use the AMQP appender, if yes how can we do that? (any example / tutorial)
  4. What are the other approaches to ensure the message is sent?

Note: I do not want to use the publisher confirm the approach.


Solution

  • Connection Refused is not a ShutdownSignalException - the connection was never established because the broker is not present on the server/port.

    You can't use the listeners to confirm delivery or return of individual messages; use publisher confirms and returns for that.

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#publishing-is-async

    See the documentation for how to use the appenders.

    https://docs.spring.io/spring-amqp/docs/current/reference/html/#logging

    EDIT

    To get notified of failures to connect, you currently need to use other techniques, depending on whether you are sending or receiving.

    Here is an example that shows how:

    @SpringBootApplication
    public class So66882099Application {
    
        private static final Logger log = LoggerFactory.getLogger(So66882099Application.class);
    
        public static void main(String[] args) {
            SpringApplication.run(So66882099Application.class, args);
        }
    
        @RabbitListener(queues = "foo")
        void listen(String in) {
    
        }
    
        // consumer side listeners for no connection
    
        @EventListener
        void consumerFailed(ListenerContainerConsumerFailedEvent event) {
            log.error(event + " via event listener");
            if (event.getThrowable() instanceof AmqpConnectException) {
                log.error("Broker down?");
            }
        }
    
        // or
    
        @Bean
        ApplicationListener<ListenerContainerConsumerFailedEvent> eventListener() {
            return event -> log.error(event + " via application listener");
        }
    
        // producer side - use a RetryListener
    
        @Bean
        RabbitTemplate template(ConnectionFactory cf) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cf);
            RetryTemplate retry = new RetryTemplate();
            // configure retries here as needed
            retry.registerListener(new RetryListener() {
    
                @Override
                public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                    return true;
                }
    
                @Override
                public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
    
                    log.error("Send failed " + throwable.getMessage());
                }
    
                @Override
                public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                        Throwable throwable) {
                }
    
            });
            rabbitTemplate.setRetryTemplate(retry);
            return rabbitTemplate;
        }
    
    
        @Bean
        public ApplicationRunner runner(RabbitTemplate template) {
            return args -> {
                try {
                    template.convertAndSend("foo", "bar");
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            };
        }
    
    }