Search code examples
pythonrabbitmqpika

RabbitMQ Unack'ed messages not getting requeued


I'm facing an issue in running RabbitMQ consumers for a long time. Several of my messages end up in an unack'ed state.

My RabbitMQ version: 3.6.15 Pika version: 0.11.0b

import pika
import time
import sys
import threading
from Queue import Queue
rabbitmq_server = "<SERVER>"
queue = "<QUEUE>"
connection = None

def check_acknowledge(channel, connection, ack_queue):
    delivery_tag = None
    while(True):
        try:
            delivery_tag = ack_queue.get_nowait()
            channel.basic_nack(delivery_tag=delivery_tag)
            break
        except:
            connection.process_data_events()
        time.sleep(1)


def process_message(body, delivery_tag, ack_queue):
    print "Received %s" % (body)
    print "Waiting for 600 seconds before receiving next ID\n"
    start = time.time()
    elapsed = 0
    while elapsed < 10:
        elapsed = time.time() - start
        print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
        time.sleep(1)
    ack_queue.put(delivery_tag)




def callback(ch, method, properties, body):
    global connection
    ack_queue = Queue()
    t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
    t.start()
    check_acknowledge(ch, connection, ack_queue)

while True:
    try:
        connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
        channel = connection.channel()
        print ' [*] Waiting for messages. To exit press CTRL+C'
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue=queue)
        channel.start_consuming()
    except KeyboardInterrupt:
        break

channel.close()
connection.close()
exit(0)

Am I missing something here?


Solution

  • I used the following multi-threaded consumer to solve this problem.

    import pika
    import time
    import sys
    import threading
    from Queue import Queue
    rabbitmq_server = "<RABBITMQ_SERVER_IP>"
    queue = "hello1"
    connection = None
    
    
    
    
    def check_acknowledge(channel, connection, ack_queue):
        delivery_tag = None
        while(True):
            try:
                delivery_tag = ack_queue.get_nowait()
                channel.basic_ack(delivery_tag=delivery_tag)
                break
            except:
                connection.process_data_events()
            time.sleep(1)
    
    
    def process_message(body, delivery_tag, ack_queue):
        print "Received %s" % (body)
        print "Waiting for 600 seconds before receiving next ID\n"
        start = time.time()
        elapsed = 0
        while elapsed < 300:
            elapsed = time.time() - start
            print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
            time.sleep(1)
        ack_queue.put(delivery_tag)
    
    
    
    
    def callback(ch, method, properties, body):
        global connection
        ack_queue = Queue()
        t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
        t.start()
        check_acknowledge(ch, connection, ack_queue)
    
    while True:
        try:
            connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
            channel = connection.channel()
            print ' [*] Waiting for messages. To exit press CTRL+C'
            channel.basic_qos(prefetch_count=1)
            channel.basic_consume(callback, queue=queue)
            channel.start_consuming()
        except KeyboardInterrupt:
            break
    
    channel.close()
    connection.close()
    exit(0)
    
    1. The consumer callback function triggers a separate function check_acknowledge in the main thread itself. Due to this, connection and channel objects are retained in the same thread. Note that Pika is not thread-safe so we need to maintain these objects in the same thread.
    2. The actual processing happens in a new thread spawned off the main.
    3. Once process_message is done with its processing, it puts the delivery_tag in the queue.

    4. check_acknowledge loops indefinitely till it finds the delivery_tag put in the queue by process_message. Once it does find, it acks the message and returns.

    I have tested this implementation by running this consumer by sleeping for 5 min, 10 mins, 30 mins and an hour. This is working very well for me.