Search code examples
djangocelerydjango-channelscelerybeat

Sending message from Celery task to Channels


Django 2.1.1, Django Channels 2.1.3, Celery 4.2.1

I've set up a task in Celery and at the end of the task, I need to send a websocket message to the client(s). However, the websocket message is never sent. There are no errors thrown, it just simply doesn't send.

I've set up a channel layer using Redis as the backend. Doing this from a normal Django view works fine. But when run in a Celery task, it sends the message to Channels and I can see that Channels does indeed run the code shown in my consumers.py code below, but the client never receives the websocket message.

tasks.py

def import_job(self):
    # (do task calculations, store in data dict)
    message = {'type': 'send_my_data',
               'data': json.dumps(thecalcs) }
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)('core-data', message)

consumers.py

class AsyncDataConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.channel_group_name = 'core-data'

        # Join the group
        await self.channel_layer.group_add(
            self.channel_group_name,
            self.channel_name
        )
        await self.accept()

    async def disconnect(self, close_code):
        # Leave the group
        await self.channel_layer.group_discard(
            self.channel_group_name,
            self.channel_name
        )

    # Receive message from WebSocket
    async def receive(self, text_data=None, bytes_data=None):
        pass

    # Receive message from the group
    async def send_my_data(self, event):
        text = event['data']
        # Send message to WebSocket
        await self.send(text_data=text)

settings.py

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            "hosts": [('127.0.0.1', 6379)],
        },
    },
}

Since there is no exception/error, I am completely at a loss as to which part of this process is failing.

  1. Celery triggers the task? Yes
  2. The task runs and sends a message to the channel layer? Yes
  3. The consumer receives the message from the group and executes the send()? Yes
  4. Client receives the a websocket message? NO

Is this a problem between Channels and Redis? Is it a problem between Channels and the client?


Solution

  • It turns out Celery was swallowing an exception in my code during the task. I need to implement more thorough logging to catch these exceptions.