I am using txamqp python library to connect to an AMQP broker (RabbitMQ) and i have a consumer with the following callback:
@defer.inlineCallbacks
def message_callback(self, message, queue, chan):
"""This callback is a queue listener
it is called whenever a message was consumed from queue
c.f. test_amqp.ConsumeTestCase for use cases
"""
# The callback should be redefined here to keep getting further messages from queue
queue.get().addCallback(self.message_callback, queue, chan).addErrback(self.message_errback)
print " [x] Received a valid message: [%r]" % (message.content.body,)
yield self.smpp.sendDataRequest(SubmitSmPDU)
# ACK the message in queue, this will remove it from the queue
chan.basic_ack(message.delivery_tag)
When "ack"ing a message, it will be deleted (to confirm ?) from the queue, but what happens when the message is not "ack"ed ? i need to get a "retry" mechanism where i can postpone the message to be callbacked again later on and to keep track of how much retries did it take.
And how can i list/delete messages from a queue ?
It's resolved, in order to retry a message from the queue, it is necessary to reject the message with 'retry' flag, it will be enqueued back to the queue.
If i reject it with a timer (callLater in twisted), the message enqueuing will be postponed for whatever time i want.