Search code examples
phprabbitmqamqpphp-amqplib

RabbitMQ : binding from a DLX


I've searched for that information (including the docs) and I can't find it.

I'm using the latest version of php-amqplib with RabbitMQ v. 2.7.1. I have three queues and three exchanges :

// Declare the exchanges
$this->channel->exchange_declare(self::EXCHANGE_TO_PROCESS, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_WAITING, 'direct', false, true, false, false, false);
$this->channel->exchange_declare(self::EXCHANGE_TO_CLEAN, 'direct', false, true, false, false, false);

// Messages in the to_process queue are sent to to_clean after 24 hours without being processed
$this->channel->queue_declare(self::QUEUE_TO_PROCESS, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_CLEAN),
    'x-message-ttl' => array('I', 86400000), // 1 day in milli-seconds
));

// Messages in the waiting queue are sent to to_process after 5 minutes (wait period before retry)
$this->channel->queue_declare(self::QUEUE_WAITING, false, true, false, false, false, array(
    'x-dead-letter-exchange' => array('S', self::EXCHANGE_TO_PROCESS),
    'x-message-ttl' => array('I', 300000), // 5 minutes in milli-seconds
));

// Messages in the to_clean queue are kept until they are processed
$this->channel->queue_declare(self::QUEUE_TO_CLEAN, false, true, false, false, false);

// Bind the queues to the exchanges
$this->channel->queue_bind(self::QUEUE_TO_PROCESS, self::EXCHANGE_TO_PROCESS);
$this->channel->queue_bind(self::QUEUE_TO_CLEAN, self::EXCHANGE_TO_CLEAN);
$this->channel->queue_bind(self::QUEUE_WAITING, self::EXCHANGE_WAITING);

The behavior is pretty straightforward : messages are published into the EXCHANGE_TO_PROCESS. An external worker processes the message : if the processing goes A-OK, the message is simply ACK'ed and thus removed from the queue (this part works perfectly) ; if the processing goes wrong, the message is instead inserted into the EXCHANGE_WAITING where, after a TTL of 5 minutes, it is reinserted through DLX into the EXCHANGE_TO_PROCESS for re-processing. After the third failure, though, it is inserted into the EXCHANGE_TO_CLEAN where a cron job will come and clean up messages, log errors, etc.

The problem I've run into, however, is that the code clearly binds the QUEUE_WAITING to the EXCHANGE_WAITING (as expected), but when I look into the RabbitMQ management page, I notice that two queues are bound to that exchange, namely QUEUE_TO_PROCESS and QUEUE_WAITING, in that order. When the 5 minutes are over, the message then disappears. I'm not quite sure why.

All this to bring us to my questions : does the dead letter exchange implicitly bind the exchange in parameter to the queue? And : what could possibly be happening to my lost messages?

EDIT

I'm even more confused than I was. I've tried the following, very basic code :

    $this->channel->exchange_declare('exchangeA', 'fanout', false, true, false, false, false);
    $this->channel->exchange_declare('exchangeB', 'fanout', false, true, false, false, false);
    $this->channel->queue_declare('queueA', false, true, false, false, false, array(
        'x-dead-letter-exchange' => array('S', 'exchangeB'),
        'x-message-ttl' => array('I', 5000)
    ));
    $this->channel->queue_declare('queueB', false, true, false, false, false);
    $this->channel->queue_bind('queueA', 'exchangeA');
    $this->channel->queue_bind('queueB', 'exchangeB');

    $msg = new AMQPMessage('hello!');
    $this->channel->basic_publish($msg, 'exchangeA');

This creates two queues and two exchanges (I've seen them to fanout to avoid bothering with routing keys), binds queueA to exchangeA and queueB to exchangeB, setting a TTL on queueA and its DLX to exchangeB. Watching what happens in the management page, I see a message spending 5 seconds in queueA, as expected, and then the message disappears, just like in my more complex code above.


Solution

  • I got doubts that something was amiss when I stumbled upon this blog and saw the poster with a case very similar to ours and he mentioned it was working without problems... so I started digging a bit more.

    The problem we had was simply a version problem. I'd been told the RabbitMQ package was up to date but we're using Ubuntu 12.04 LTS so the "up to date" version was 2.7.1 - a version that's over 3 years old.

    If you're in the same case as we were (using an older distro), checkout RabbitMQ's download page and pick the one that fits your distro. In the case of Ubuntu, we simply added the official repo (you can also simply download the .dpkg file), performed an apt-get update and waited for the server to reboot. Afterwards, the above code worked pretty much as is.