Search code examples
phprabbitmqamqpconsumer

Do to block consumer on empty queue


Specifics

I have consumer, written in PHP, which tries to consume messages. My goal is simple as - if there are no messages in the queue, release the execution and continue, considering there was "no data retrieved".

Current thoughts

I tried AMQP_NOWAIT flag, like:

$flag = AMQP_NOWAIT;
$this->queue->consume($callbackFunction, $flag, $this->consumerTag);

And it didn't work. So far, I have workaround, like - I am declaring connection timeout for \AMQPConnection, as, let's say, 5 seconds, and then catching it in this way:

try {
    $this->consumer->consume($this->consumer->getReadMessageCallback($notifications, $requeue));
} catch (\AMQPConnectionException $connectionException) {
    //based on timeouts. Are there other ways to interrupt empty queue consuming? AMQP_NOWAIT fails, does nothing:
    return [];
}

But, that is a very "hacky" way to do it. It's working for me, but:

  • Still blocking code for timeout seconds
  • Will obviously fail, if I will have too many messages (i.e. it won't be able to finish till timeout will end).
  • More, it's not even documented, so relying on this should be last resort.

Next - I tried AMQP_IFEMPTY | AMQP_PASSIVE on queue creation. The thing is - it will delete queue if there are no messages there, and it will raise an exception (which I may catch) on attempt to get messages from there. But then there is a problem like - queue is immediately deleted and I can not even add messages there.

Question

Reading messages from empty queue is really a common issue, thus I'm sure it should be a way to resolve it in a proper matter. Thus, how would I do that?

Yes, manual links are /pl/ as there are no "en" links. But it's more or less readable as it is in English there in any case.


Solution

  • If you need to figure out whether queue empty or not, you can call AMQPQueue::declare() which is idempotent and in result returns messages count in queue. Note, that number is not pretty accurate (see why).

    In addition, you can just call AMQPQueue::get() (behaves like it's done in admin tool).

    And after that all, as you also tried, you can set AMQPConnection::setReadTimeout() to some low value (in local net 1 sec may be enough), and then call AMQPQueue::consume() and catch timeout exception if consumer waits too long.

    As to poor documentation, see answer to this question: Where can i find the php-amqp documentation.