Search code examples
twistedpika

Easier way to use pika asynchronous (twisted)?


This is my first project using rabbitmq and I am complete lost because I am not sure what would be the best way to solve a problem.

The program is fairly simple, it just listen for alarms events, and then put the events in a rabbitmq queue, but I am struggling with the architecture of the program.

If I open, publish and then close the connection for every single event, I will add a lot of latency, and unnecessary packages will be transmitted (even more than the usual because I am using TLS)...

If I keep a connection open, and create a function that publish the messages (I only work with a single queue, pretty basic), I will eventually have problems because multiple events can occur at the same time, and my program will not know what to do if the connection to the rabbitmq broker end.

Reading their documentations, the solution seems use one of their "Connection Adapters", which would fit me like a glove because I just rewrite all my connection stuff from basic sockets to use Twisted (I really liked their high level approach). But there is a problem. Their "basic example" is fairly complex for someone who barely considers himself "intermediate".

In a perfect world, I would be able to run the service in the same reactor as the "alarm servers" and call a method to publish a message. But I am struggling to understand the code. Has anyone who worked with pika could point me a better direction, or even tell me if there is a easier way?


Solution

  • Well, I will post what worked for me. Probably is not the best alternative but maybe it helps someone who gets here with the same problem.

    First I decided to drop Twisted and use Asyncio (nothing personal, I just wanted to use it because it's already in python), and even tho pika had a good example using Asynchronous, I tried and found it easier to just use aio_pika.

    I end up with 2 main functions. One for a publisher and another for a subscriber. Bellow is my code that works for me...

    # -*- coding: utf-8 -*-
    
    import asyncio
    import aio_pika
    from myapp import conf
    
    QUEUE_SEND = []
    
    
    def add_queue_send(msg):
        """Add MSG to QUEUE
    
        Args:
            msg (string): JSON
        """
        QUEUE_SEND.append(msg)
    
    
    def build_url(amqp_user, amqp_pass, virtual_host):
        """Build Auth URL
    
        Args:
            amqp_user (str): User name
            amqp_pass (str): Password
            virtual_host (str): Virtual Host
    
        Returns:
            str: AMQP URL
        """
        return ''.join(['amqps://',
                        amqp_user, ':', amqp_pass,
                        '@', conf.get('amqp_host'), '/', virtual_host,
                        '?cafile=', conf.get('ca_cert'),
                        '&keyfile=', conf.get('client_key'),
                        '&certfile=', conf.get('client_cert'),
                        '&no_verify_ssl=0'])
    
    
    async def process_message(message: aio_pika.IncomingMessage):
        """Read a new message
    
        Args:
            message (aio_pika.IncomingMessage): Mensagem
        """
        async with message.process():
            #   TODO: Do something with the new message
            await asyncio.sleep(1)
    
    
    async def consumer(url):
        """Keep listening to a MQTT queue
    
        Args:
            url (str): URL
    
        Returns:
            aio_pika.Connection: Conn?
        """
        connection = await aio_pika.connect_robust(url=url)
        # Channel
        channel = await connection.channel()
        # Max concurrent messages?
        await channel.set_qos(prefetch_count=100)
        # Queue
        queue = await channel.declare_queue(conf.get('amqp_queue_client'))
        #   What call when a new message is received
        await queue.consume(process_message)
        #   Returns the connection?
        return connection
    
    
    async def publisher(url):
        """Send messages from the queue.
    
        Args:
            url (str): URL de autenticação
        """
        connection = await aio_pika.connect_robust(url=url)
        # Channel
        channel = await connection.channel()
        while True:
            if QUEUE_SEND:
                #   If the list (my queue) is not empty
                msg = aio_pika.Message(body=QUEUE_SEND.pop().encode())
                await channel.default_exchange.publish(msg, routing_key='queue')
            else:
                #   Just wait
                await asyncio.sleep(1)
        await connection.close()
    

    I started both using the ``loop.create_task```.

    As I said. It kinda worked for me (even tho I am still having an issue with another part of my code) but I did not want to left this question open since most people can have the same issue.

    If you know a better approach or a more elegant approach, please, share.