Search code examples
javaspringspring-bootrabbitmqspring-amqp

Spring AMQP and refreshscope beans


I am using spring-amqp from spring-boot-starter-parent 3.2.3.

I have the following config:

  @Bean
  @RefreshScope
  public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setHost(host);
    connectionFactory.setPort(port);
    return connectionFactory;
  }

The ListenerContainerFactory is setup as follows:

public SimpleRabbitListenerContainerFactory
      rabbitListenerContainerFactory(ConnectionFactory connectionFactory,
                                 MessageConverter messageConverter,
                                 LoggingMessagePostProcessor loggingMessagePostProcessor,
                                 RetryOperationsInterceptor messageRetryInterceptor,
                                 SimpleRabbitListenerContainerFactoryConfigurer configurer,
                                 PlatformTransactionManager platformTransactionManager) {

    var factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    factory.setMessageConverter(messageConverter);
    factory.setDefaultRequeueRejected(false);
    factory.setPrefetchCount(getPrefetchCount());
    factory.setConcurrentConsumers(getConcurrentConsumers());
    factory.setMaxConcurrentConsumers(getMaxConcurrentConsumers());
    factory.setAdviceChain(messageRetryInterceptor);
    factory.setAfterReceivePostProcessors(loggingMessagePostProcessor);
    factory.setChannelTransacted(true);
    factory.setTransactionManager(platformTransactionManager);
    return factory;
  }

(note transacted channel and transaction manager is set)

My workflow is as follows:

  1. async message receipt via a method annotated with @RabbitListener

  2. Persist to a database using spring jpa via @Transactional method

  3. Produce synchronous message using AmqpTemplate

The problem I am experiencing is that when our AMQP credentials for the ConnectionFactory bean defined above are refreshed periodically by calling the actuator endpoint it causes an error in the logs as follows:

{"timestamp":"2024-07-28T02:47:11.683026026Z","loggerClass":"org.springframework.transaction.support.TransactionTemplate","thread":"rabbit-simple-4","level":"ERROR","stackTrace":"com.rabbitmq.client.ShutdownSignalException: clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)\n\tat com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:1007)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1127)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1056)\n\tat com.rabbitmq.client.impl.AMQConnection.close(AMQConnection.java:1040)\n\tat   org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:896)\n\tat org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1741)\n\tat org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:52)\n\tat java.base/java.lang.VirtualThread.run(Unknown Source)\n","CORRELATION_ID":"dae77ebc-7636-4763-b703-975f3f9174e6","throwable_class":"ShutdownSignalException","serviceName":"My-Service","message":["Application exception overridden by rollback exception"],"serverName":"My-Service-775d5db996-c8m9s"}

There are no issues caused in the application functionality. I believe it is simply destroying the old connections and creating new ones. The question is can it be done gracefully without causing things to be logged with level ERROR in the logs?

Interestingly, if I remove the line factory.setTransactionManager(platformTransactionManager); the error messages in the logs go away.


Solution

  • You have to consider to stop your @RabbitListener container before refreshing that bean. (Not sure, though, how to do that). The transaction is started for every receive call and it is indeed associated with the connection. So, if your connection factory is refreshed in between, then indeed it might be broken before transaction is committed.

    Might be great if you can share with us a simple project to reproduce on our side. I won't mind if we are missing something in Spring AMQP for this use-case.

    UPDATE

    Thank you for the sample! Unfortunately there is indeed nothing we can do from the library side. The connection is reset for new properties and it is expected that all the channel becomes orphaned. Therefore such an error on transaction.

    You may suppress that error in logs for a org.springframework.transaction.support.TransactionTemplate category or go with a TransactionInterceptor extension to have that custom category for this kind of use-case.

    Another way is to react to these events:

      public RabbitMqConfig(MyAmqpProperties properties, RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry) {
        this.myAmqpProperties = properties;
        this.rabbitListenerEndpointRegistry = rabbitListenerEndpointRegistry;
      }
    
      @EventListener(EnvironmentChangeEvent.class)
      public void environmentChanged() {
        this.rabbitListenerEndpointRegistry.getListenerContainers().forEach(Lifecycle::stop);
      }
    
      @EventListener(RefreshScopeRefreshedEvent.class)
      public void scopeRefreshed() {
        this.rabbitListenerEndpointRegistry.getListenerContainers().forEach(Lifecycle::start);
      }
    

    So, when refresh scope starts it emits that EnvironmentChangeEvent first before destroying beans in the scope. When it is done refreshing all of the requested beans, it emits that RefreshScopeRefreshedEvent. So, those are signals for us to stop all our active @RabbitListers.

    Additional option must be applied for the SimpleRabbitListenerContainerFactory:

    factory.setForceStop(true);
    

    Otherwise the main loop for the listener container does not exit immediately and goes asynchronous.

    We cannot catch those events in Spring AMQP library directly since they are Spring Cloud stuff.