Search code examples
javarabbitmqspring-amqpspring-rabbit

Spring-AMQP client does not recover after broker restart with idle channels getting stuck


So I have two microservices communicating through RabbitMQ, one of them sends RPC requests (using RabbitTemplate#sendAndReceive) to another. And I noticed that the app is not able to recover if the broker have been down for a few minutes.

Here's how I configure the connection factory and template:

    @Bean
    public AbstractConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(createRabbitConnectionFactory(properties));
    }

    private static com.rabbitmq.client.ConnectionFactory createRabbitConnectionFactory(RabbitProperties properties) {
        final com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
        connectionFactory.setHandshakeTimeout(60000);
        connectionFactory.setHost(properties.getHost());
        connectionFactory.setPort(properties.getPort());
        connectionFactory.setUsername(properties.getUsername());
        connectionFactory.setPassword(properties.getPassword());
        connectionFactory.setVirtualHost(properties.getVirtualhost());
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(
            AbstractConnectionFactory connectionFactory
    ) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setReplyTimeout(10000);
        rabbitTemplate.setUserCorrelationId(true);
        return rabbitTemplate;
    }

And here's how I use RabbitTemplate:

final Message responseMessage = rabbitTemplate.sendAndReceive(
                routingKey,
                new Message(serializationService.toBytes(request),
                        MessagePropertiesBuilder
                                .newInstance()
                                .setContentType("my-content")
                                .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
                                .setCorrelationId(correlationData.getId())
                                .build()
                ),
                correlationData
        );

Then what I do is I put load on API method that triggers this RPC, then I turn off the broker and wait for a few minutes. Right after broker start I see a lot of logs like this:

SimpleConsumer [queue=amq.rabbitmq.reply-to, index=1091, consumerTag=amq.ctag-1DtEyYlNIMSNowMMsoPYNQ identity=57919ab] started

And then all new requests lead to an error:

org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later.

And in RabbitMQ interface I see that my connection has created 2047 channels, all of them are in idle state. Only app restart helps to fix the problem.

What do you think is happening here and how can I make the client automatically recover?

I use spring-boot-starter-amqp of version 2.7.5 (so spring-messaging 5.3.2 and spring-rabbit 2.4.7)

I've tried configuring channelCheckoutTimeout:

cachingConnectionFactory.setChannelCheckoutTimeout(10000);
cachingConnectionFactory.setChannelCacheSize(200);

But I just get another error and same situation:

org.springframework.amqp.AmqpTimeoutException: No available channels

Solution

  • Found what is the problem. Your application properties must have these:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: guest
        password: guest
        cache:
          channel:
            checkout-timeout: 30s
    

    Pay attention to that checkout-timeout: 30s. This configures a limit on the opened channels from the CachingConnectionFactory:

    /**
     * Sets the channel checkout timeout. When greater than 0, enables channel limiting
     * in that the {@link #channelCacheSize} becomes the total number of available channels per
     * connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
     * does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
     * new connection to be created with the new limit.
     * <p>
     * Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
     * @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
     * @since 1.4.2
     * @see #setConnectionLimit(int)
     */
    public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {
    

    More info in docs: https://docs.spring.io/spring-amqp/reference/amqp/connections.html#cachingconnectionfactory

    Starting with version 1.4.2, the CachingConnectionFactory has a property called channelCheckoutTimeout. When this property is greater than zero, the channelCacheSize becomes a limit on the number of channels that can be created on a connection. If the limit is reached, calling threads block until a channel is available or this timeout is reached, in which case a AmqpTimeoutException is thrown.

    You also can adjust that cache to what fits your expectations:

    spring:
      rabbitmq:
        cache:
          channel:
            checkout-timeout: 30s
            size: 100
    

    Note: host/port and credentials are just like that by default. So, no need in them in the application.yml.