Search code examples
javarabbitmqspring-integrationspring-cloud-streamspring-rabbit

Remove RabbitMQ consumers and see it in Browser's RabbitMQ Console


Disclaimer : Noobie in RabbitMq and/or Spring Integration and/or Spring Cloud Stream.

I have the following class:

@Component
public class RabbitMQChannelBindingFactory {

...

  private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;
  private org.springframework.cloud.stream.config.BindingServiceProperties bindingServiceProperties;
  private org.springframework.cloud.stream.binding.BindingService bindingService;
  private org.springframework.beans.factory.config.ConfigurableListableBeanFactory beanFactory;
  private org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory bindingTargetFactory;
  private org.springframework.amqp.rabbit.connection.ConnectionFactory rabbitConnectionFactory;
...
}

What is needed?

I have a mechanism that creates an Exchange+Queue+Consumer and I have a mechanism that destroys these. The exchange and the queue have the auto-delete set on true.

What is the problem?

The inherited mechanism that destroys all of those 3 elements does not work.

It deletes just the Exchange. The Queue doesn't get deleted because it still has a Consumer and I can see it in my application also.

What has been tried?

I tried using the JVisualVM to get to the String instance of customer tag, I then walked up the hierarchy do remove the consumers.

I have changed the org.springframework.amqp.rabbit.listener.BlockingQueueConsumer inside my application so that it would be loaded first by the class loader.

I added inside something like this, in order to keep track of all the Consumers created in my application:

public class BlockingQueueConsumer {
  ...
  public static List<BlockingQueueConsumer> all = new ArrayList<>();
  public BlockingQueueConsumer(...) {
     ...
     all.add(this);
     ...
  }
  ...
}

Once I have done the previous step, I have added another method inside the RabbitMQChannelBindingFactory class to call the cancel method for all the consumers, something like this:

class RabbitMQChannelBindingFactory {

   public void disconnect(...) {
      BlockingQueueConsumer lastBlockingQueueConsumer = 
          BlockingQueueConsumer.all.get(BlockingQueueConsumer.all.size() - 1);
      lastBlockingQueueConsumer.getConsumerTags()
          .forEach(consumerTag -> basicCancel(lastBlockingQueueConsumer, consumerTag));
   }
}

At this point on the Browser with RabbitMQ Console loaded, we can see that the Queue is delete (besides the Exchange and the Consumer).

What is the problem?

I can not find a way to connect the BlockingQueueConsumer to the autowired properties.

For example I have tried

  public void deleteRabbitMQConsumer() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(rabbitConnectionFactory);
    rabbitTemplate.execute(channel -> {
      if (channel instanceof ChannelN) {
        ChannelN channelN = (ChannelN) channel;
        return true;
      }
      return false;
    });
  }

but it seems that there are no consumers inside the ChannelN.

Can you please give me a direction what needs to be understood first?

Or what are some sources for helping me?

Or anybody that has tried this action of cancelling the Consumer using this autowired properties?

Or do I need to add other autowired properties?

I have tried the https://stackoverflow.com/a/27633771/13622666 solution.

The solution

@Component
public class RabbitMQChannelBindingFactory {

...

  private org.springframework.cloud.stream.binder.rabbitRabbitMessageChannelBinder binder;

  private void connectAndDisconnectConsumer(...) {
      ...
      Binding<MessageChannel> messageChannelBinding = 
         binder.bindConsumer(exchangeName, "", channel, consumerProperties);
      ... // receive messages
      messageChannelBinding.stop(); 
     ...
   }  

}

And the stacktrace:

messageChannelBinding.stop();
 DefaultBinding#stop
  AbstractEndpoint#stop()
   AmqpInboundChannelAdapter#doStop
    AbstractMessageListenerContainer#stop()
     AbstractMessageListenerContainer#doStop
      AbstractMessageListenerContainer#shutdown
      SimpleMessageListenerContainer#doShutdown
       BlockingQueueConsumer#basicCancel(boolean)

Solution

  • Voted to close. You must not abuse Java classes system like that, but more concentrate to learn the library you use. Probably some one has already asked about the solution you are looking for. As Gary said: there is just stop() on the Spring Cloud Stream binding, which is going to stop a MessageListenerContainer, which, in turn, will cancel all the consumers it has on the queue. And your auto-deleted queue is going to be removed from RabbitMQ. There is no reason in destroying exchanges. Although you can do that via AmqpAdmin.deleteExchange().