Search code examples
spring-bootrabbitmqspring-rabbit

How to fix ListenerExecutionFailedException: Listener threw exception amqp.AmqpRejectAndDontRequeueException: Reply received after timeout


I set up rabbbitMQ on my java spring-boot application and it works properly (it seems), but after running for a while and somehow with same time interval It throws below exception.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1646) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1550) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1473) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1405) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995) [spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955) [spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar!/:5.4.3]
        at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar!/:5.4.3]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_201]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_201]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
        at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2523) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1547) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
        ... 11 common frames omitted 

below you can find the consumer code for rabbit configuration

 @Bean
    public DirectExchange exchange() {
        return new DirectExchange("rpc");
    }


    @Bean
    @Qualifier("Consumer")
    public Queue queue() {
        return new Queue(RoutingEngine.class.getSimpleName()+"_"+config.getDatasetName());
    }

    @Bean
    public Binding binding(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(Consumer.class.getSimpleName()+"_"+config.getDatasetName());
    }


    @Bean
    @Qualifier("ConsumerExport")
    public AmqpInvokerServiceExporter exporter(RabbitTemplate template, Consumer service) {
        AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter();
        exporter.setAmqpTemplate(template);
        exporter.setService(service);
        exporter.setServiceInterface(Consumer.class);
        return exporter;
    }

    @Bean
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,@Qualifier("consumer") Queue queue,
                                                    @Qualifier("RoutingEngineExport") AmqpInvokerServiceExporter exporter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setPrefetchCount(5);
        container.setQueues(queue);
        container.setMessageListener(exporter);
        logger.info("initialize rabbitmq with {} Consumers",config.getCount());
        container.setConcurrency(1+"-"+config.getCount());
        return container;
    }




    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("event");
    }

    @Bean
    @Qualifier("reinitialize")
    public Queue reInitQueue() {
        return new Queue("bus."+config.getConsumerName(),false,true,true);
    }

    @Bean
    public Binding topicBinding(@Qualifier("reinitialize") Queue queue, FanoutExchange fanoutExchangee) {
        return BindingBuilder
                .bind(queue)
                .to(fanoutExchangee);
    }

    @Bean
    public MessageListener<Consumer> messageListener(RabbitTemplate rabbitTemplate,Consumer target){
        return new MessageListener<>(rabbitTemplate, target, "engine", config.getConsumerName());
    }

and also producer configuration code is

    @Bean
    public AmqpProxyFactoryBean rerouteProxy(RabbitTemplate template) {
        AmqpProxyFactoryBean proxy = new AmqpProxyFactoryBean();
        proxy.setAmqpTemplate(template);
        proxy.setServiceInterface(ConsumerService.class);
        proxy.setRoutingKey(ConsumerService.class.getSimpleName());
        return proxy;
    }

    @Bean
    public Map<String,Consumer> consumerEngines( RabbitTemplate template){
        Map<String,Consumer> ret= new ConcurrentHashMap<>();
        //FIXme read from config
        List<String> lst = Arrays.asList(config.getEngines());
        lst.parallelStream().forEach(k->{
                AmqpProxyFactoryBean proxy = new AmqpProxyFactoryBean();
                template.setReceiveTimeout(400);
                template.setReplyTimeout(400);
                proxy.setAmqpTemplate(template);
                proxy.setServiceInterface(Consumer.class);
                proxy.setRoutingKey(Consumer.class.getSimpleName() + "_" + k);
                proxy.afterPropertiesSet();
                ret.put(k, (Consumer) proxy.getObject());
        });
        return ret;
    }

what causes this problem and how to fix it?

NOTE 1: I have 3 producers and 3 consumers on different servers, and rabbit is running on another server

ٔNOTE 2: Consumers are very fast, their response time is less than 100 miliseconds


Solution

  • Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout

    This is caused by one of two reasons

    • the reply took too long to arrive (in which case the send and receive operation would have returned null earlier).

    • a consumer sent more than one reply for the same request