I'm trying to send a JSON to Rabbitmq. this is my producer and it works properly:
import pika
import json
credentials = pika.PlainCredentials('admin', '123')
parameters = pika.ConnectionParameters('192.168.1.11',
5672,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='myapp')
message = {'fname': 'test', 'lname': 'test'}
channel.basic_publish(exchange='',
routing_key='myapp',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode = 2,
))
print(" [x] Sent %r" % message)
connection.close()
This is the receiver and it's not ok:
import pika
import time
import json
credentials = pika.PlainCredentials('admin', '123')
parameters = pika.ConnectionParameters('192.168.1.12',
5672,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='myapp')
def callback(ch, method, properties, body):
myjson = json.loads(body)
global fname
global lname
fname = myjson["fname"]
lname = myjson["lname"]
my_func(fname,lname)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='myapp',
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
def my_func(fname,lname):
pass
when I run scripts the receiver show below problem :
NameError: global name 'my_func' is not defined
The issue is due to this function in your receiver.py
channel.start_consuming()
The function will block the program there and prevent it from seeing my_func(). On the other hand, the code analysis tool, e.g. pylint, may mostly likely fail to detect such error.
An easy solution is to change the code to
def callback(ch, method, properties, body):
myjson = json.loads(body)
global fname
global lname
fname = myjson["fname"]
lname = myjson["lname"]
my_func(fname,lname)
def my_func(fname,lname):
pass
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='myapp',
auto_ack=True,
on_message_callback=callback)
channel.start_consuming()
A more complex solution is to create your own class or method and run start_consuming() in a asynchromized way. Here is my example:
def listen(self, exchange, routing, callback):
self.ReceiveChannel = self.connection.channel()
consumer_thread = threading.Thread(target=self.__consumer__, args=[exchange, routing, callback])
consumer_thread.start()