Search code examples
pythontwistedrabbitmqamqp

How to delete or postpone a message in the AMQP queue


I am using txamqp python library to connect to an AMQP broker (RabbitMQ) and i have a consumer with the following callback:

@defer.inlineCallbacks
def message_callback(self, message, queue, chan):
    """This callback is a queue listener
    it is called whenever a message was consumed from queue
    c.f. test_amqp.ConsumeTestCase for use cases
    """

    # The callback should be redefined here to keep getting further messages from queue
    queue.get().addCallback(self.message_callback, queue, chan).addErrback(self.message_errback)        

    print " [x] Received a valid message: [%r]" % (message.content.body,)

    yield self.smpp.sendDataRequest(SubmitSmPDU)

    # ACK the message in queue, this will remove it from the queue
    chan.basic_ack(message.delivery_tag)

When "ack"ing a message, it will be deleted (to confirm ?) from the queue, but what happens when the message is not "ack"ed ? i need to get a "retry" mechanism where i can postpone the message to be callbacked again later on and to keep track of how much retries did it take.

And how can i list/delete messages from a queue ?


Solution

  • It's resolved, in order to retry a message from the queue, it is necessary to reject the message with 'retry' flag, it will be enqueued back to the queue.

    If i reject it with a timer (callLater in twisted), the message enqueuing will be postponed for whatever time i want.