Search code examples
springspring-bootspring-amqpspring-rabbit

Which AmqpEvent or AmqpException to handle when an exclusive consumer fails


I have two instances of the same application, running in different virtual machines. I want to grant exclusive access to a queue for the consumer of one of them, while invalidating the local cache that is used by the consumer on the other.

Now, I have figured out that I need to handle ListenerContainerConsumerFailedEvent but I am guessing that implementing an ApplicationListener for this event is not going to ensure that I am receiving this event because of an exclusive consumer exception. I might want to check the Throwable of the event, or event further checks.

Which subclass of AmqpException or what further checks should I perform to ensure that the exception is received due to exclusive consumer access?


Solution

  • The logic in the listener container implementations is like this:

    if (e.getCause() instanceof ShutdownSignalException
                && e.getCause().getMessage().contains("in exclusive use")) {
            getExclusiveConsumerExceptionLogger().log(logger,
                    "Exclusive consumer failure", e.getCause());
            publishConsumerFailedEvent("Consumer raised exception, attempting restart", false, e);
        }
    

    So, we indeed raise a ListenerContainerConsumerFailedEvent event and you can trace the cause message like we do in the framework, but on the other hand you can just inject your own ConditionalExceptionLogger:

    /**
     * Set a {@link ConditionalExceptionLogger} for logging exclusive consumer failures. The
     * default is to log such failures at WARN level.
     * @param exclusiveConsumerExceptionLogger the conditional exception logger.
     * @since 1.5
     */
    public void setExclusiveConsumerExceptionLogger(ConditionalExceptionLogger exclusiveConsumerExceptionLogger) {
    

    and catch such an exclusive situation over there.

    Also you can consider to use RabbitUtils.isExclusiveUseChannelClose(cause) in your code:

    /**
     * Return true if the {@link ShutdownSignalException} reason is AMQP.Channel.Close
     * and the operation that failed was basicConsumer and the failure text contains
     * "exclusive".
     * @param sig the exception.
     * @return true if the declaration failed because of an exclusive queue.
     */
    public static boolean isExclusiveUseChannelClose(ShutdownSignalException sig) {