Search code examples
python-3.xrabbitmqamqppython-multithreadingpython-pika

Pass a function (callback) variable between functions running in separate threads


I am trying to develop a Python 3.6 script which uses pika and threading modules.

I have a problem which I think is caused by my A) being very new to Python and coding in general, and B) my not understanding how to pass variables between functions when they are run in separate threads and already being passed a parameter in parentheses at the end of the receiving function name.

The reason I think this, is because when I do not use threading, I can pass a variable between functions simply by calling the receiving function name, and supplying the variable to be passed, in parentheses, a basic example is shown below:

def send_variable():
    body = "this is a text string"
    receive_variable(body)

def receive_variable(body):
    print(body)

This when run, prints:

this is a text string

A working version of the code I need to to get working with threading is shown below - this uses straight functions (no threading) and I am using pika to receive messages from a (RabbitMQ) queue via the pika callback function, I then pass the body of the message received in the 'callback' function to the 'processing function' :

import pika
...mq connection variables set here...


# defines username and password credentials as variables set at the top of this script
    credentials = pika.PlainCredentials(mq_user_name, mq_pass_word)

# defines mq server host, port and user credentials and creates a connection
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, port=mq_port, credentials=credentials))

# creates a channel connection instance using the above settings
    channel = connection.channel()

# defines the queue name to be used with the above channel connection instance
    channel.queue_declare(queue=mq_queue)


def callback(ch, method, properties, body):

# passes (body) to processing function
    body_processing(body)

# sets channel consume type, also sets queue name/message acknowledge settings based on variables set at top of script
    channel.basic_consume(callback, queue=mq_queue, no_ack=mq_no_ack)
# tells the callback function to start consuming
    channel.start_consuming()
# calls the callback function to start receiving messages from mq server
    callback()
# above deals with pika connection and the main callback function


def body_processing(body):
    ...code to send a pika message every time a 'body' message is received...

This works fine however I want to translate this to run within a script that uses threading. When I do this I have to supply the parameter 'channel' to the function name that runs in its own thread - when I then try to include the 'body' parameter so that the 'processing_function' looks as per the below:

def processing_function(channel, body):

I get an error saying:

[function_name] is missing 1 positional argument: 'body'

I know that when using threading there is more code needed and I have included the actual code that I use for threading below so that you can see what I am doing:

...imports and mq variables and pika connection details are set here...

def get_heartbeats(channel):
channel.queue_declare(queue=queue1)
#print (' [*] Waiting for messages. To exit press CTRL+C')

    def callback(ch, method, properties, body):

        process_body(body)

        #print (" Received %s" % (body))

    channel.basic_consume(callback, queue=queue1, no_ack=no_ack)
    channel.start_consuming()


def process_body(channel, body):
    channel.queue_declare(queue=queue2)
    #print (' [*] Waiting for Tick messages. To exit press CTRL+C')

# sets the mq host which pika client will use to send a message to
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host))
# create a channel connection instance
    channel = connection.channel()
# declare a queue to be used by the channel connection instance
    channel.queue_declare(queue=order_send_queue)
# send a message via the above channel connection settings
    channel.basic_publish(exchange='', routing_key=send_queue, body='Test Message')
# send a message via the above channel settings
# close the channel connection instance
    connection.close()


def manager():

# Channel 1 Connection Details - =======================================================================================

    credentials = pika.PlainCredentials(mq_user_name, mq_password)
    connection1 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
    channel1 = connection1.channel()

# Channel 1 thread =====================================================================================================
    t1 = threading.Thread(target=get_heartbeats, args=(channel1,))
    t1.daemon = True
    threads.append(t1)
    # as this is thread 1 call to start threading is made at start threading section

# Channel 2 Connection Details - =======================================================================================

    credentials = pika.PlainCredentials(mq_user_name, mq_password)
    connection2 = pika.BlockingConnection(pika.ConnectionParameters(host=mq_host, credentials=credentials))
    channel2 = connection2.channel()

# Channel 2 thread ====================================================================================================
    t2 = threading.Thread(target=process_body, args=(channel2, body))
    t2.daemon = True
    threads.append(t2)
    t2.start()  # as this is thread 2 - we need to start the thread here

# Start threading
t1.start()  # start the first thread - other threads will self start as they call t1.start() in their code block
for t in threads: # for all the threads defined
    t.join()  # join defined threads

manager()  # run the manager module which starts threads that call each module

This when run produces the error

process_body() missing 1 required positional argument: (body)

and I do not understand why this is or how to fix it.


Solution

  • On further looking in to this and playing with the code it seems that if I edit the lines:

    def process_body(channel, body):
    

    to read

    def process_body(body):
    

    and

    t2 = threading.Thread(target=process_body, args=(channel2, body))
    

    so that it reads:

    t2 = threading.Thread(target=process_body)
    

    then the code seems to work as needed - I also see multiple script processes in htop so it appears that threading is working - I have left the script processing for 24 hours + and did not receive any errors...