Search code examples
pythonpython-3.xrabbitmqpikapython-pika

Pika RabbitMQ Publish from a consumer


I have a RabbitMQ consumer. I would like to have that consumer do some message processing, simulated by time.sleep(10), then publish a message to a different queue. I know the consumer callback has a channel that in theory could be used to do the publish, but this seems like a bad implementation because if the basic_publish() somehow manages for force close the channel, then the consumer dies. What is the best way to handle this?

import time
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs', exchange_type='fanout')

result = channel.queue_declare(queue='original_queue', exclusive=True)

channel.queue_bind(exchange='logs', queue='original_queue')

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    time.sleep(10)
    ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')

channel.basic_consume(
    queue='original_queue', on_message_callback=callback, auto_ack=True)

channel.start_consuming()

Solution

  • You can implement your consumer in a way that it automatically reconnects to the RabbitMQ server if the connection gets closed. Hope this helps(I didn't put much thought on the design part, feel free to suggest some!)

    import time
    import pika
    
    reconnect_on_failure = True
    
    
    def consumer(connection, channel):
    
        channel.exchange_declare(exchange='logs', exchange_type='fanout')
        result = channel.queue_declare(queue='original_queue', exclusive=True)
        channel.queue_bind(exchange='logs', queue='original_queue')
        print(' [*] Waiting for logs. To exit press CTRL+C')
    
        def callback(ch, method, properties, body):
            time.sleep(10)
            ch.basic_publish(exchange='logs', routing_key='different_queue', body='hello_world')
    
        channel.basic_consume(
      queue='original_queue', on_message_callback=callback, auto_ack=True)
    
        channel.start_consuming()
    
    
    def get_connection_and_channel():
        connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
        channel = connection.channel()
        return connection, channel
    
    
    def start(reconnect_on_failure):
        connection, channel = get_connection_and_channel()
        consumer(connection, channel)
        # the if condition will be executed when the consumer's start_consuming loop exists
        if reconnect_on_failure:
            # cleanly close the connection and channel
            if not connection.is_closed():
                connection.close()
            if not channel.is_close():
                channel.close()
            start(reconnect_on_failure)
    
    
    start(reconnect_on_failure)