Search code examples
multithreadingrabbitmqmessaging

RabbitMQ Java Client Using DefaultConsumer vs QueueingConsumer


  1. DefaultConsumer
    My DemoConsumer inherits from DefaultConsumer.
    I have noticed that working this way handleDelivery() is invoked from ThreadPool.
    (printing Thread.currentThread().getName() I see pool-1-thread-1/2/3/4 eachtime.
    I have also tested it several times and saw that the order is saved.
    Just to make sure - since different threads call handle delivery - will it mess my order?

  2. QueueingConsumer
    All of the java tutorial use QueueingConsumer to consume messages.
    In the API Docs it is mentioned as a deprecated class.
    Should I change my code to inherit from DefaultConsumer use it? Is the tutorial outdated?

Thanks.


Solution

  • Yes,DefaultConsumer uses an internal thread pool that can be changed. Using ExecutorService as:

    ExecutorService es = Executors.newFixedThreadPool(20);
    Connection conn = factory.newConnection(es);
    

    Read http://www.rabbitmq.com/api-guide.html “Advanced Connection options”.

    As you can read from the “QueueingConsumer” doc:

    As such, it is now safe to implement Consumer directly or to extend DefaultConsumer.

    I never used QueueingConsumer, because it isn’t properly event-driven.

    As you can see here:

    QueueingConsumer consumer = new QueueingConsumer(channel);
    channel.basicConsume(QUEUE_NAME, true, consumer);
    while (true) {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
        /// here you are blocked, waiting the next message.
        String message = new String(delivery.getBody());
    }
    

    A typical problem in this case is how to close the subscription, and a common workaround is to send a tagged close message in local host. Actually I don’t like it so much.

    If you extend DefaultConsumer instead, you can correctly close the subscription and the channel:

    public class MyConsumer extends DefaultConsumer {...}
    

    then

    public static void main(String[] args) {
    MyConsumer consumer = new MyConsumer (channel);
    String consumerTag = channel.basicConsume(Constants.queue, false, consumer);
    System.out.println("press any key to terminate");
    System.in.read();
    channel.basicCancel(consumerTag);
    channel.close();
    ....
    

    In conclusion, you shouldn’t worry about the message order because if all works correctly, the message order is correct, but I think you can’t assume it because if there is some problem, you can lose the message order. If you absolutely need to maintain message order, you should include a sequential tag to reconstruct the message order at the consumer side.

    And you should extend DefaultConsumer.