Search code examples
springrabbitmqspring-rabbit

How to fix weird "channelMax" error rabbitmq rpc?


I created simple client and server. Client sends rpc requests:

RabbitTemplate template.convertSendAndReceive(...) ;

Server receive it, and answers back:

@RabbitListener(queues = "#{queue.getName()}")
public Object handler(@Payload String key)...

Then I make client send rpc requests asynchronously, simultaneously(which produces lot of concurrent rpc requests).

And unexpectedly receive an error:

org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later. at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:59) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1208) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1196) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:599) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:582) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:552) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:534) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1400(CachingConnectionFactory.java:99) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel

Rabbitmq client seems create too many channels. How to fix it? And why my client create them so many?


Solution

  • Channels are cached so there should only be as many channels as there are actual RPC calls in process.

    You may need to increase the channel max setting on the broker.

    EDIT

    If your RPC calls are long-lived, you can reduce the time the channel is used by using the AsyncRabbitTemplate with an explicit reply queue, and avoid using the direct reply-to feature.

    See the documentation.

    EDIT2

    Here is an example using the AsyncRabbitTemplate; it sends 1000 messages on 100 threads (and the consumer has 100 threads).

    The total number of channels used was 107 - 100 for the consumers and only 7 were used for sending.

    @SpringBootApplication
    public class So56126654Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So56126654Application.class, args);
        }
    
        @RabbitListener(queues = "so56126654", concurrency = "100")
        public String slowService(String in) throws InterruptedException {
            Thread.sleep(5_000L);
            return in.toUpperCase();
        }
    
        @Bean
        public ApplicationRunner runner(AsyncRabbitTemplate asyncTemplate) {
            ExecutorService exec = Executors.newFixedThreadPool(100);
            return args -> {
                System.out.println(asyncTemplate.convertSendAndReceive("foo").get());
                for (int i = 0; i < 1000; i++) {
                    int n = i;
                    exec.execute(() -> {
                        RabbitConverterFuture<Object> future = asyncTemplate.convertSendAndReceive("foo" + n);
                        try {
                            System.out.println(future.get(10, TimeUnit.SECONDS));
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            e.printStackTrace();
                        }
                        catch (ExecutionException e) {
                            e.printStackTrace();
                        }
                        catch (TimeoutException e) {
                            e.printStackTrace();
                        }
                    });
                }
            };
        }
    
        @Bean
        public AsyncRabbitTemplate asyncTemplate(ConnectionFactory connectionFactory) {
            return new AsyncRabbitTemplate(connectionFactory, "", "so56126654", "so56126654-replies");
        }
    
        @Bean
        public Queue queue() {
            return new Queue("so56126654");
        }
    
        @Bean
        public Queue reeplyQueue() {
            return new Queue("so56126654-replies");
        }
    
    }