I want to share the BlockingChannel
across multiple python process.
In order to send
basic_ack
from other python process.
How to share the BlockingChannel
across multiple python processes.
Following is the code:
self.__connection__ = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.__channel__ = self.__connection__.channel()
I have tried to dump using pickle
but it doenst allow to dump Channel and give error can't pickle select.epoll objects
using the follwoing code
filepath = "temp/" + "merger_channel.sav"
pickle.dump(self.__channel__, open(filepath, 'wb'))
GOAL:
Goal is to send basic_ack
from channel from other python processes.
It is an antipattern to share a channel between multiple threads and it's quite unlikely you will manage to share it between processes.
The rule of thumb is 1 connection
per process and 1 channel
per thread.
You can read more in regard of this matter at the following links:
If you want to pair message consumption together with multiprocessing, the usual pattern is to let the main process receive the messages, deliver their payload to a pool of worker processes and acknowledge them once they are done.
Simple example using pika.BlockingChannel
and concurrent.futures.ProcessPoolExecutor
:
def ack_message(channel, delivery_tag, _future):
"""Called once the message has been processed.
Acknowledge the message to RabbitMQ.
"""
channel.basic_ack(delivery_tag=delivery_tag)
for message in channel.consume(queue='example'):
method, properties, body = message
future = pool.submit(process_message, body)
# use partial to pass channel and ack_tag to callback function
ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag)
future.add_done_callback(ack_message_callback)
The above loop will endlessly consume messages from the example
queue and submit them to the pool of processes. You can control how many messages to process concurrently via RabbitMQ consumer prefetch parameter. Check pika.basic_qos
to see how to do it in Python.