I set up rabbbitMQ on my java spring-boot application and it works properly (it seems), but after running for a while and somehow with same time interval It throws below exception.
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1646) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1550) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1473) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1461) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1456) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1405) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.callExecuteListener(DirectMessageListenerContainer.java:995) [spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer$SimpleConsumer.handleDelivery(DirectMessageListenerContainer.java:955) [spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149) [amqp-client-5.4.3.jar!/:5.4.3]
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104) [amqp-client-5.4.3.jar!/:5.4.3]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_201]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_201]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_201]
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
at org.springframework.amqp.rabbit.core.RabbitTemplate.onMessage(RabbitTemplate.java:2523) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.lambda$setMessageListener$1(DirectReplyToMessageListenerContainer.java:115) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1547) ~[spring-rabbit-2.1.4.RELEASE.jar!/:2.1.4.RELEASE]
... 11 common frames omitted
below you can find the consumer code for rabbit configuration
@Bean
public DirectExchange exchange() {
return new DirectExchange("rpc");
}
@Bean
@Qualifier("Consumer")
public Queue queue() {
return new Queue(RoutingEngine.class.getSimpleName()+"_"+config.getDatasetName());
}
@Bean
public Binding binding(Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(Consumer.class.getSimpleName()+"_"+config.getDatasetName());
}
@Bean
@Qualifier("ConsumerExport")
public AmqpInvokerServiceExporter exporter(RabbitTemplate template, Consumer service) {
AmqpInvokerServiceExporter exporter = new AmqpInvokerServiceExporter();
exporter.setAmqpTemplate(template);
exporter.setService(service);
exporter.setServiceInterface(Consumer.class);
return exporter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,@Qualifier("consumer") Queue queue,
@Qualifier("RoutingEngineExport") AmqpInvokerServiceExporter exporter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setPrefetchCount(5);
container.setQueues(queue);
container.setMessageListener(exporter);
logger.info("initialize rabbitmq with {} Consumers",config.getCount());
container.setConcurrency(1+"-"+config.getCount());
return container;
}
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("event");
}
@Bean
@Qualifier("reinitialize")
public Queue reInitQueue() {
return new Queue("bus."+config.getConsumerName(),false,true,true);
}
@Bean
public Binding topicBinding(@Qualifier("reinitialize") Queue queue, FanoutExchange fanoutExchangee) {
return BindingBuilder
.bind(queue)
.to(fanoutExchangee);
}
@Bean
public MessageListener<Consumer> messageListener(RabbitTemplate rabbitTemplate,Consumer target){
return new MessageListener<>(rabbitTemplate, target, "engine", config.getConsumerName());
}
and also producer configuration code is
@Bean
public AmqpProxyFactoryBean rerouteProxy(RabbitTemplate template) {
AmqpProxyFactoryBean proxy = new AmqpProxyFactoryBean();
proxy.setAmqpTemplate(template);
proxy.setServiceInterface(ConsumerService.class);
proxy.setRoutingKey(ConsumerService.class.getSimpleName());
return proxy;
}
@Bean
public Map<String,Consumer> consumerEngines( RabbitTemplate template){
Map<String,Consumer> ret= new ConcurrentHashMap<>();
//FIXme read from config
List<String> lst = Arrays.asList(config.getEngines());
lst.parallelStream().forEach(k->{
AmqpProxyFactoryBean proxy = new AmqpProxyFactoryBean();
template.setReceiveTimeout(400);
template.setReplyTimeout(400);
proxy.setAmqpTemplate(template);
proxy.setServiceInterface(Consumer.class);
proxy.setRoutingKey(Consumer.class.getSimpleName() + "_" + k);
proxy.afterPropertiesSet();
ret.put(k, (Consumer) proxy.getObject());
});
return ret;
}
what causes this problem and how to fix it?
NOTE 1: I have 3 producers and 3 consumers on different servers, and rabbit is running on another server
ٔNOTE 2: Consumers are very fast, their response time is less than 100 miliseconds
Caused by: org.springframework.amqp.AmqpRejectAndDontRequeueException: Reply received after timeout
This is caused by one of two reasons
the reply took too long to arrive (in which case the send and receive operation would have returned null
earlier).
a consumer sent more than one reply for the same request