Let me begin by saying that I just started to dabble with AMQP.
I want to consume/pull data from queue. I'm using Spring's libs (spring-boot-starter-amqp) in order to make things easier. I have a listener class with method annotated with @RabbitListener where I set queue. Everything else is configured via properties:
rabbitmq:
username: user
password: password
virtual-host: virtual-host
port: 5672
host: host
queue: _316_
listener:
simple:
retry:
enabled: true
initial-interval: 1000
max-attempts: 8
max-interval: 10000
multiplier: 2.0
stateless: true
Everything works fine until I make host unavailable for a while. When that happens connection is dropped and attempts are made in order to reestablish it. After connection is reestablished again listener doesn't start to pull messages. After application is restarted everything is fine, but I'm sure it can be configured somehow that consumer keeps restarting, at least it should try to do it after connection was reestablished (or at least this is what I'd expect).
After connection has been dropped following can be found in logs:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); Consumer@22ead351: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@22ead351: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)\n\tat java.base/java.lang.Thread.run(Thread.java:835)\nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat
Then connection attempt is made and we're in a loop:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED
Until connection is reestablished:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/SimpleConnection@3931e0ad [delegate=amqp://user@host, localPort= 50574]
And nothing else happens - no messages are being consumed.
UPDATE: Followed suggestion and turned on DEBUG logging.
When app is starting we're:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
creating the connection
starting consumer
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@3daf03d8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://user@host,1)
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[B@4b817fae(byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])
This goes on until connection drops:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp:///user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
Channel is shutdown and user somehow gets deleted as log says:
org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)
Issue with connection driver followed by exception being thrown:
com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)
org.springframework.amqp.rabbit.support.ConsumerCancelledException: null\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)\n\tat java.base/java.lang.Thread.run(Thread.java:835
Consumer raises exception and it says that processing can r-e-s-t-a-r-t
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it
Restating happens:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0
Channels are being closed:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906]
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://user@host,1)
New consumer is starting:
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
We're attempting to connect which ends up with WARN and AUTHENTICATION failure, (because previous log said that user was deleted?):
An unexpected connection driver error occured (Exception message: Socket closed)
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat
ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat
Consumer that tried to start:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup
And it (consumer) gets cancelled:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
Channel is closed and container is stopping:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer
And then container is shutting down:
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container
We're waiting for workers to finish, it's successful, then we're trying to connect again, the same SOCKET_CLOSED is being logged over and over again.
Then host is brought back and connection is reestablished. Cached Rabit Channel is being created and nothing happens.
I'd assume that issue is that container was shut down and never came back to life, hence there're no consumers.
WHAT WORKED:
I created a class that has a "listenning" method that accepts ListenerContainerConsumerFailedEvent. That class has RabbitListenerEndpointRegistry (bean that Boot conveniently created for me) and whenever that method is called I'm checking if listenerContainer is running if not then I'm starting it (that checking is most likely redundant).
@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
if (!listenerContainer.isRunning()){
listenerContainer.start();
}
}
org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure
FatalListenerStartupException
Authentication failures are considered fatal and the container is immediately stopped; it is unlikely such situations will be corrected automatically.
Deleting a user that is currently in use is a rather unusual circumstance.
You could use an ApplicationListener
bean or @EventListener
method to listen for a ListenerContainerConsumerTerminatedEvent
and try restarting the container after some time.