I am planning to publish data from IoT nodes via MQTT into a RabbitMQ Queue. The data is then processed and the state needs to be saved into Redis.
I spun up a docker container for RabbitMQ and configured it to enable MQTT (Port: 1883).
Based on RabbitMQ's MQTT Plugin Documentation
amq.topic
Exchange and subscribed to Queue Names similar to MQTT topics where /
is replaced by .
e.g. hello/test
MQTT Topic -> hello.test
RabbitMQ Queue.Simple example using pika
is as follows and works perfectly
import argparse, sys, pika
def main():
args = parse_arguments()
# CLI TAKES IN BROKER PARAMETERS in `args`
# Removed for brevity
broker_credentials = pika.PlainCredentials(args.rabbit_user, args.rabbit_pass)
print("Setting Up Connection with RabbitMQ Broker")
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host=args.rabbit_host,
port=args.rabbit_port,
credentials=broker_credentials
)
)
# Create Channel
print("Creating Channel")
channel = connection.channel()
# Declare the Exchange
print("Declaring the exchange to read incoming MQTT Messages")
# Exchange to read mqtt via RabbitMQ is always `amq.topic`
# Type of exchange is `topic` based
channel.exchange_declare(exchange='amq.topic', exchange_type='topic', durable=True)
# the Queue Name for MQTT Messages is the MQTT-TOPIC name where `/` is replaced by `.`
# Let RabbitMQ create the name for us
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind the Queue with some specific topic where you wish to get the data from
channel.queue_bind(exchange='amq.topic', queue=queue_name, routing_key=args.rabbit_topic)
# Start Consuming from Queue
channel.basic_consume(queue=queue_name, on_message_callback=consumer_callback, auto_ack=True)
print("Waiting for MQTT Payload")
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("CTRL+C Pressed")
sys.exit(0)
I only found out about Celery and was looking into it. In a lot examples It is usually when an external script triggers the task and then workers resolve the task and save it to a backend
(here in my case Redis)
e.g.
app = Celery('tasks', broker='RABBITMQ_BROKER_URL')
@app.task
def process_iot_data(incoming_data):
time.sleep(1.0)
# Do Some Necessary data processing and store the processed state in Redis
I have been going through a lot of Blogs where Celery tasks are used with REST APIs and upon calling the APIs the tasks are queued and performed and the state is saved in a backend.
I am unable to find any examples where during initializing the Celery(..)
app, I can instantiate the necessary exchange
i.e. amq.topic
and things I have done via the consumer code above with pika
.
I am unable to realize what is the possible way to use celery where a task is queued when the data in the RabbitMQ queue is pushed. Unlikesending a REST API request, I wish to process the incoming data in the Queue with celery tasks upon the insertion of data in the respective Queue.
Is this something realizable with Celery or should I stick with pika
and write things in a callback function?
I wish to do some simulations where I can scale the Consumers many-fold and try to see how much can my dockerized consumer applications withstand extreme high amounts of data and processing.
In short - no.
Celery is not designed to process arbitrary data sent to a message queue system. It is designed to produce/consume messages that contain serialised Celery task details, so that consumers can execute particular task on the other end, and put the result into the result backend.
However, I firmly believe almost any arbitrary message you can think of can be wrapped (this way or another) into a Celery task. The real problem though is when you do not want Celery on one of the ends (producer or consumer). Producers can send tasks without the ned to share code that contains task definitions using the convenient send_task() function.