Search code examples
c++rabbitmqamqp

amqp-cpp library: channel->cancel(consumerTag) doesn't seem to cancel


I'm using the AMQP-CPP library (not the amqp C library) to connect to RabbitMQ. Consuming a queue works, as does publishing. I don't use exchanges or anything, just one consumer on one queue. When attempting to cancel a queue, I'm still receiving messages after the DeferredCancel::onSuccess callback is executed. Also, the callback(std::string consumer) is empty, should this be the consumerTag yet again?

Here's what I'm observing:

// publish many messages to "queueName"

m_channel->consume("queueName", "hardcodedConsumerTag").onReceived([](AMQP::Message m){std::cout << m.body()<< std::endl;});

m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){std::cout << "should have stopped consuming for: " << consumer << std::endl});

output:

message 1
should have stopped consuming for: (here is just an empty string)
message 2
message 3
... until all messages have been delivered

I would have expected the messages to stop after the output "should have stopped consuming" is printed.


Solution

  • Turns out the consume request has not yet been processed when the cancel() request is sent. RabbitMQ/the AMQP-CPP library responds "success" even though no consumer was canceled, since no consumer existed on the RabbitMQ side yet. THEN the consume() is processed, which is why I was seeing the aforementioned behavior.

    I fixed it by wrapping everything in callbacks. I'm maintaining my own list of DeferredQueue and DeferredConsumer, and store if the onSuccess callback has already been executed (since there doesn't seem to be a "pending" equivalent in AMQP-CPP).

    If the onSuccess callback has NOT yet been executed, I override the onSuccess callback, if it has already been executed I can just cancel normally.

    // publish many messages to "queueName"
    
    bool onSuccessExecuted = false;
    
    auto& deferredConsumer = m_channel->consume("queueName", "hardcodedConsumerTag");
    
    deferredConsumer.onReceived([](AMQP::Message m){
          std::cout << m.body()<< std::endl;
        });
    
    deferredConsumer.onSuccess([&](){
          onSuccessExecuted=true;
          // do stuff you want to do when starting consuming a queue
        });
    
    if (onSuccessExecuted == false){
      // this overwrites the previous onSuccess callback
      deferredConsumer.onSuccess([this](){
          cancel();
          // must still be set if we might want to cancel again later
          onSuccessExecuted=true;
        }
    } else {
      // if onSuccess has already been executed we just cancel normally,
      // as the onSuccess callback won't be executed again
      cancel();
    }
    
    void cancel() {
      m_channel->cancel("hardcodedConsumerTag").onSuccess([](std::string consumer){
            std::cout << "should have stopped consuming for: " << consumer << std::endl
          });
    }