Search code examples
pythondjangopython-3.xmultithreadingdjango-channels

How to multithread AsyncConsumer with Django Channels


I've been working with Django Channels for a week and somethin bugs me with the runworker parallelism.

For example, I have this MQTT client which publishes in the channels when it receives a message, basic.

async def treat_message(msg):
    channel_layer = get_channel_layer()
    payload = json.loads(msg.payload, encoding="utf-8")

    await channel_layer.send("mqtt", {
        "type": "value.change",
        "message": payload
    })

This is sending it well. I can send how much I want, it will be sent to the redis queue. To the channel mqtt.

I then run the worker which will redirect messages in the queue for mqtt with :

python manage.py runworker mqtt
2018-09-12 16:33:42,232 - INFO - runworker - Running worker for channels ['mqtt']

This is where the problem begins. Here is the content of the AsyncConsumer reading the data :

class MQTTConsumer(AsyncConsumer):
    async def value_change(self, event):
        await asyncio.sleep(5)
        print("I received changes : {}".format(event["message"]))

I putted a sleep in order to simulate business of the task. And this is where I'm going : the async consumer is not multi threaded ! When I send two messages to the channel, it takes 10 seconds to the consumer to treat the second message, instead of 5 if it were multi threaded. As shown below.

2018-09-12 16:45:25,271 - INFO - runworker - Running worker for channels ['mqtt']
2018-09-12 16:45:32,559 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:37,561 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:42,563 - INFO - mqtt - I received changes : {'oui': 'non'}
2018-09-12 16:45:47,565 - INFO - mqtt - I received changes : {'oui': 'non'}

Any intel on the subject would be a great help, thanks in advance !

EDIT: The only way to manage it I found is to make an executor which will contain the workers to do it async. But I'm not sure of its efficiency for deploy purposes

def handle_mqtt(event):
    time.sleep(3)
    logger.info("I received changes : {}".format(event["message"]))


class MQTTConsumer(AsyncConsumer):
    def __init__(self, scope):
        super().__init__(scope)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)

    async def value_change(self, event):
        loop = asyncio.get_event_loop()
        future = loop.run_in_executor(self.executor, handle_mqtt, event)

Solution

  • This is currently by design

    Yes, that is the intended design, as it's the safest way (it prevents race conditions if you're not aware of it). If you are happy to run messages in parallel, just spin off your own coroutines whenever you need them (using asyncio.create_task), making sure that you clean them up and wait for them on shutdown. It's quite a lot of overhead, so hopefully we'll ship an opt-in mode for that in the consumer in future, but for now all we ship with is the safe option.

    https://github.com/django/channels/issues/1203