Search code examples
javaspring-bootrabbitmqspring-rabbit

Listener doesn't pull data after connection to host was reestablished


Let me begin by saying that I just started to dabble with AMQP.

I want to consume/pull data from queue. I'm using Spring's libs (spring-boot-starter-amqp) in order to make things easier. I have a listener class with method annotated with @RabbitListener where I set queue. Everything else is configured via properties:

    rabbitmq:
      username: user
      password: password
      virtual-host: virtual-host
      port: 5672
      host: host
      queue: _316_
      listener:
        simple:
          retry:
            enabled: true
            initial-interval: 1000
            max-attempts: 8
            max-interval: 10000
            multiplier: 2.0
            stateless: true

Everything works fine until I make host unavailable for a while. When that happens connection is dropped and attempts are made in order to reestablish it. After connection is reestablished again listener doesn't start to pull messages. After application is restarted everything is fine, but I'm sure it can be configured somehow that consumer keeps restarting, at least it should try to do it after connection was reestablished (or at least this is what I'd expect).

After connection has been dropped following can be found in logs:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); Consumer@22ead351: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@22ead351: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)\n\tat java.base/java.lang.Thread.run(Thread.java:835)\nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat 

Then connection attempt is made and we're in a loop:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED 

Until connection is reestablished:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/SimpleConnection@3931e0ad [delegate=amqp://user@host, localPort= 50574]

And nothing else happens - no messages are being consumed.

UPDATE: Followed suggestion and turned on DEBUG logging.

When app is starting we're:

  1. starting listener container
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
  1. creating the connection

  2. starting consumer

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@3daf03d8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
  1. creating channel and starting to consume
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://user@host,1)

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[B@4b817fae(byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])

This goes on until connection drops:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp:///user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

Channel is shutdown and user somehow gets deleted as log says:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

Issue with connection driver followed by exception being thrown:

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.support.ConsumerCancelledException: null\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)\n\tat java.base/java.lang.Thread.run(Thread.java:835

Consumer raises exception and it says that processing can r-e-s-t-a-r-t

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it

Restating happens:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

Channels are being closed:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://user@host,1)

New consumer is starting:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

We're attempting to connect which ends up with WARN and AUTHENTICATION failure, (because previous log said that user was deleted?):

An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat

ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat

Consumer that tried to start:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup

And it (consumer) gets cancelled:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

Channel is closed and container is stopping:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

And then container is shutting down:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container

We're waiting for workers to finish, it's successful, then we're trying to connect again, the same SOCKET_CLOSED is being logged over and over again.

Then host is brought back and connection is reestablished. Cached Rabit Channel is being created and nothing happens.

I'd assume that issue is that container was shut down and never came back to life, hence there're no consumers.

WHAT WORKED:

I created a class that has a "listenning" method that accepts ListenerContainerConsumerFailedEvent. That class has RabbitListenerEndpointRegistry (bean that Boot conveniently created for me) and whenever that method is called I'm checking if listenerContainer is running if not then I'm starting it (that checking is most likely redundant).

@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
        if (!listenerContainer.isRunning()){
            listenerContainer.start();
        }
    }

Solution

  • org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure

    FatalListenerStartupException

    Authentication failures are considered fatal and the container is immediately stopped; it is unlikely such situations will be corrected automatically.

    Deleting a user that is currently in use is a rather unusual circumstance.

    You could use an ApplicationListener bean or @EventListener method to listen for a ListenerContainerConsumerTerminatedEvent and try restarting the container after some time.