I'm facing an issue in running RabbitMQ consumers for a long time. Several of my messages end up in an unack'ed state.
My RabbitMQ version: 3.6.15 Pika version: 0.11.0b
import pika
import time
import sys
import threading
from Queue import Queue
rabbitmq_server = "<SERVER>"
queue = "<QUEUE>"
connection = None
def check_acknowledge(channel, connection, ack_queue):
delivery_tag = None
while(True):
try:
delivery_tag = ack_queue.get_nowait()
channel.basic_nack(delivery_tag=delivery_tag)
break
except:
connection.process_data_events()
time.sleep(1)
def process_message(body, delivery_tag, ack_queue):
print "Received %s" % (body)
print "Waiting for 600 seconds before receiving next ID\n"
start = time.time()
elapsed = 0
while elapsed < 10:
elapsed = time.time() - start
print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
time.sleep(1)
ack_queue.put(delivery_tag)
def callback(ch, method, properties, body):
global connection
ack_queue = Queue()
t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
t.start()
check_acknowledge(ch, connection, ack_queue)
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
channel = connection.channel()
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue)
channel.start_consuming()
except KeyboardInterrupt:
break
channel.close()
connection.close()
exit(0)
Am I missing something here?
I used the following multi-threaded consumer to solve this problem.
import pika
import time
import sys
import threading
from Queue import Queue
rabbitmq_server = "<RABBITMQ_SERVER_IP>"
queue = "hello1"
connection = None
def check_acknowledge(channel, connection, ack_queue):
delivery_tag = None
while(True):
try:
delivery_tag = ack_queue.get_nowait()
channel.basic_ack(delivery_tag=delivery_tag)
break
except:
connection.process_data_events()
time.sleep(1)
def process_message(body, delivery_tag, ack_queue):
print "Received %s" % (body)
print "Waiting for 600 seconds before receiving next ID\n"
start = time.time()
elapsed = 0
while elapsed < 300:
elapsed = time.time() - start
print "loop cycle time: %f, seconds count: %02d" %(time.clock(), elapsed)
time.sleep(1)
ack_queue.put(delivery_tag)
def callback(ch, method, properties, body):
global connection
ack_queue = Queue()
t = threading.Thread(target=process_message, args=(body, method.delivery_tag, ack_queue))
t.start()
check_acknowledge(ch, connection, ack_queue)
while True:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters(host=rabbitmq_server))
channel = connection.channel()
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=queue)
channel.start_consuming()
except KeyboardInterrupt:
break
channel.close()
connection.close()
exit(0)
callback
function triggers a separate function check_acknowledge
in the main thread itself. Due to this, connection and channel objects are retained in the same thread. Note that Pika is not thread-safe so we need to maintain these objects in the same thread.Once process_message
is done with its processing, it puts the delivery_tag
in the queue.
check_acknowledge
loops indefinitely till it finds the delivery_tag
put in the queue by process_message
. Once it does find, it acks
the message and returns.
I have tested this implementation by running this consumer by sleep
ing for 5 min, 10 mins, 30 mins and an hour. This is working very well for me.