Search code examples
rabbitmqspring-amqpspring-rabbit

RabbitMQ CachingConnectionFactory and publisherReturns configuration


A continuation of Make Spring RabbitMQ fail on missing exchange

I register MessageListenerContainer for multiple queues.

Where and how should I configure channel returnListener? - I thing the way I have done it is wrong. I inserted CachingConnectionFactory configuration into createQueueBMessageListener(...) - method responsible for creating one of multiple MessageListeners.

1. How should be CachingConnectionFactory additional configuration done Spring and Rabbit way? By now I didn't configure it in Java (only by application.properties and admins in K8S environment). I only injected ConnectionFactory and set it as connectionFactory in SimpleMessageListenerContainer (as in createQueueAMessageListener(...)), I even didn't know it's CachingConnectionFactory.

Is there something like CachingConnectionFactoryConfigurer?

2. Why is ReturnListener.handleReturn(..) not executed? ChannelListener.onCreate(...) is executed.

3. Checking missing exchange exception in cachingConnectionFactory.setCloseExceptionLogger and doing System.exit(1) there seems wrong to me, isn't it? But this is all I managed to do by now. I want to application not start when there is no exchange during binding creation. When I throw exception there application still starts. ReturnListener.handleReturn seems a better place for it, but it isn't executed when configured as below.

4. How can I stop Spring Application Context gracefully instead of System.exit(1)? - throwing exception doesn't stop Application Context. How to make RabbitMq fail to start in such situation? - when a creation of @Bean Binding at Spring Application Context start fails.

@Bean
MessageListenerContainer createQueueAMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                   ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueA");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

@Bean
MessageListenerContainer createQueueBMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                     ConnectionFactory connectionFactory,
                                                     CachingConnectionFactory cachingConnectionFactory) {

    // I think configuring CachingConnectionFactory here is a lame, isn't it? Of course connectionFactory is redundant now, I left it to show how was it done earlier.
    // Where and how should I add listeners to CachingConnectionFactory?

    cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
    cachingConnectionFactory.setPublisherReturns(true);
    cachingConnectionFactory.addChannelListener(new ChannelListener() {
        @Override
        public void onCreate(final Channel channel, final boolean transactional) {

            log.info("channelListener onCreate - this is executed");

            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(final int replyCode, final String replyText, final String exchange, final String routingKey,
                                         final AMQP.BasicProperties properties,
                                         final byte[] body) throws IOException
                {
                    log.info("!!! Why is this not executed ?!!! handleReturn replyCode: " + replyCode + " replyText: " + replyText);
                }
            });
        }
    });
    cachingConnectionFactory.addConnectionListener(new ConnectionListener() {
        @Override
        public void onCreate(final Connection connection) {
            log.info("connectionListener onCreate - this is executed" + connection);
        }
    });

    cachingConnectionFactory.setCloseExceptionLogger(new ConditionalExceptionLogger() {
        @Override
        public void log(final Log logger, final String message, final Throwable t) {
            try {
                logger.error(message + ": " + t.getMessage());
                if (t.getMessage().contains("reply-code=404, reply-text=NOT_FOUND")) {
                    // throw new RuntimeException(); it doesn't stop Spring ApplicationContext from starting
                    log.error("Executing System.exit(1) command.");
                    // System.exit(1);
                }
            } catch (Exception e) {
                log.error("err in listener ", e);
            }
           
        }
    });


    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueB");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

@Bean
MessageListenerContainer createQueueCMessageListener(SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory,
                                                     ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container = rabbitListenerContainerFactory.createListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("queueC");
    MessageConverter jsonMessageConverter = null;
    container.setMessageListener(new MessageListenerAdapter(new Object(), jsonMessageConverter));

    return container;
}

Solution

    // I think configuring CachingConnectionFactory here is a lame, isn't it?

    It is not "lame"; that is the normal way of configuring beans with additional properties not exposed directly by Boot.

    1. It should be called; have you tried debugging?

    2. Why don't you do what I advised here Make Spring RabbitMQ fail on missing exchange - it's much simpler.

    3. close() it - but, since you are using Spring Boot, it will do that for you - it registers a JVM shutdown hook that closes the context.

    EDIT

    Binding to a non-existent exchange will fail; you just need to force it to happen before the application is fully initialized, e.g. in an ApplicationRunner.

    @SpringBootApplication
    public class So70212347Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So70212347Application.class, args);
        }
    
        @Bean
        Binding binding() {
            return new Binding("foo", DestinationType.QUEUE, "doesn't exist", "foo", null);
        }
    
        @Bean
        Queue queue() {
            return new Queue("foo");
        }
    
    
        @Bean
        ApplicationRunner runner(ConnectionFactory cf) {
            return args -> {
                cf.createConnection().close();
            };
        }
    
    }
    
    Created new connection: rabbitConnectionFactory#6a0cbc6f:0/SimpleConnection@6cd164a6 [delegate=amqp://[email protected]:5672/, localPort= 62884]
    Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
    Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
    Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
    Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
    Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
    
    
    Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
    Application run failed
    java.lang.IllegalStateException: Failed to execute ApplicationRunner
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:761)
        at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:748)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:309)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1301)
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1290)
        at com.example.demo.So70212347Application.main(So70212347Application.java:16)
    Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:70)
        at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:113)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2192)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2138)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:2118)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:691)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$null$10(RabbitAdmin.java:619)
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:209)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$afterPropertiesSet$11(RabbitAdmin.java:618)
        at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.lambda$onCreate$0(CompositeConnectionListener.java:38)
        at java.base/java.util.concurrent.CopyOnWriteArrayList.forEach(CopyOnWriteArrayList.java:807)
        at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:38)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:730)
        at com.example.demo.So70212347Application.lambda$0(So70212347Application.java:33)
        at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:758)
        ... 5 common frames omitted
    Caused by: java.io.IOException: null
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:129)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:125)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:147)
        at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:1077)
        at com.rabbitmq.client.impl.ChannelN.queueBind(ChannelN.java:46)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1157)
        at com.sun.proxy.$Proxy47.queueBind(Unknown Source)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.declareBindings(RabbitAdmin.java:870)
        at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$initialize$12(RabbitAdmin.java:694)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2227)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2186)
        ... 18 common frames omitted
    Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'doesn't exist' in vhost '/', class-id=50, method-id=20)
    ...