I am getting "IllegalArgumentException" errors trying to do the following:
I have a Project with a SimpleMessageListenerContainer, listening in a queue called “USER-broadcast-queue”:
@Bean("broadcastMessageListenerContainer")
public SimpleMessageListenerContainer broadcastMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(simpleRoutingConnectionFactory());
container.setQueues(marketDataBroadcastQueue());
container.setMessageListener(messageListenerAdapter());
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setDeclarationRetries(12);
container.setFailedDeclarationRetryInterval(5000);
container.setMismatchedQueuesFatal(true);
container.setPrefetchCount(50);
container.setAutoStartup(false);
clientHandler.setBroadcastMessageListenerContainer(container);
return container;
}
@Bean
public RabbitTemplate rabbitTemplate()
{
RabbitTemplate template = new RabbitTemplate(simpleRoutingConnectionFactory());
template.setMessageConverter(jsonMessageConverter());
template.setUseDirectReplyToContainer(true);
template.setRoutingKey(REQUEST_EXCHANGE_NAME);
template.setMandatory(true);
template.setReplyTimeout(20000);
return template;
}
My ClientHandler is a vean defined like this:
@Component
public class ClientHandler{
public void handleMessage(…) {….}
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(clientHandler, jsonMessageConverter());
}
Now, when recibe some kind of message, I need to stop the container:
broadcastMessageListenerContainer.stop();
Ask a request to the server from a RabbitGatewaySupport (using direct reply-to):
_response = getRabbitOperations().convertSendAndReceive(requestExchange, routingKeyInquiry, myObject,
new MessagePostProcessor() {
public Message postProcessMessage(Message message) throws AmqpException {
…
return message;
}
});
And then start the listener again:
broadcastMessageListenerContainer.initialize();
broadcastMessageListenerContainer.start();
This is working fine the first time, but when I recibe a second message, I get this error when I try to send the request (convertSendAndReceive):
12-09-2019 13:54:13.826|DEBUG|USER|org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|1233|[]|broadcastMessageListenerContainer-1|Shutting down Rabbit listener container
12-09-2019 13:54:13.826|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.isOpen()
12-09-2019 13:54:13.826|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.basicCancel([amq.ctag-6ztkTlFxReqUvLDJOsmmdQ])
12-09-2019 13:54:13.828|INFO | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|586|[]|broadcastMessageListenerContainer-1|Waiting for workers to finish.
12-09-2019 13:54:13.828|DEBUG| USER |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|886|[]|pool-7-thread-9|Received cancelOk for tag amq.ctag-6ztkTlFxReqUvLDJOsmmdQ (USER -broadcast-queue); Consumer@5c20aab9: tags=[[amq.ctag-6ztkTlFxReqUvLDJOsmmdQ]], channel=Cached Rabbit Channel: AMQChannel(amqp://USER@127.0.0.1:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER @127.0.0.1:5671/, localPort= 55757], acknowledgeMode=AUTO local queue size=0
12-09-2019 13:54:18.829|INFO | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|592|[]|broadcastMessageListenerContainer-1|Workers not finished.
12-09-2019 13:54:18.829|WARN | USER |org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer|596|[]|broadcastMessageListenerContainer-1|Closing channel for unresponsive consumer: Consumer@5c20aab9: tags=[[amq.ctag-6ztkTlFxReqUvLDJOsmmdQ]], channel=Cached Rabbit Channel: AMQChannel(amqp:// USER@127.0.0.1:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER @127.0.0.1:5671/, localPort= 55757], acknowledgeMode=AUTO local queue size=0
12-09-2019 13:54:18.829|DEBUG| USER |org.springframework.amqp.rabbit.listener.BlockingQueueConsumer|735|[]|broadcastMessageListenerContainer-1|Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp:// USER @127.0.0.1:5671/,3), conn: Proxy@46baf579 Shared Rabbit Connection: SimpleConnection@48b0e701 [delegate=amqp:// USER@127.0.0.1:5671/, localPort= 55757]
12-09-2019 13:54:18.829|TRACE| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1063|[]|broadcastMessageListenerContainer-1|AMQChannel(amqp:// USER @127.0.0.1:5671/,3) channel.close()
12-09-2019 13:54:18.830|DEBUG| USER |org.springframework.amqp.rabbit.connection.CachingConnectionFactory|1276|[]|broadcastMessageListenerContainer-1|Closing cached Channel: AMQChannel(amqp:// USER @127.0.0.1:5671/,3)
java.lang.IllegalArgumentException: Already value [[USER-broadcast-queue]] for key [org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory@a50b09c] bound to thread [broadcastMessageListenerContainer-1]
at org.springframework.util.Assert.isNull(Assert.java:176) ~[MyLib-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.connection.SimpleResourceHolder.bind(SimpleResourceHolder.java:125) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.doConsumeFromQueue(DirectMessageListenerContainer.java:659) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.adjustConsumers(DirectMessageListenerContainer.java:313) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer.setConsumersPerQueue(DirectMessageListenerContainer.java:161) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.getChannelHolder(DirectReplyToMessageListenerContainer.java:190) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithDirect(RabbitTemplate.java:1896) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:1762) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceiveRaw(RabbitTemplate.java:1731) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1600) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:1591) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at es.siom.trading.amqp.gateway.RabbitServiceGateway.sendAndReceiveInquiry(RabbitServiceGateway.java:123) ~[Salida/:?]
at es.siom.trading.amqp.ui.UIController.sendAndReceiveInquiry(UIController.java:67) ~[Salida/:?]
at es.siom.trading.utils.ModuloTradingUtils.consultaContratosMercadoPublic(ModuloTradingUtils.java:627) ~[Salida/:?]
at es.siom.trading.utils.ModuloTradingUtils.leeMercado(ModuloTradingUtils.java:133) ~[Salida/:?]
at es.siom.trading.amqp.beans.DatosGeneralesLTS.actualizaOfertas(DatosGeneralesLTS.java:602) ~[Salida/:?]
at es.siom.trading.amqp.handler.ClientHandler.handleMessage(ClientHandler.java:89) ~[Salida/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_91]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_91]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_91]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_91]
at org.springframework.util.MethodInvoker.invoke(MethodInvoker.java:280) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:363) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:292) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1552) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1478) ~[ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1466) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1461) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1410) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [ClasesAuxCliente-0.0.5-SNAPSHOT.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Any ideas, any help? Thank you.
It's a bug.
It's an artifact of the fact that the reply-to container is initialized lazily; we shouldn't be binding its connection factory to the calling thread.
Please open a GitHub issue.
As a work around, you can delegate the send to another thread.
By the way, stopping the container on the listener thread will pause for the shutDownTimeout
before actually stopping the active consumer(s).
EDIT