Search code examples
djangocelerydjango-celerydjango-channels

Getting Django channel access in Celery task


I have a Django app that's using channels to monitor a WebSocket to kick off backend tasks in Celery. It currently sleeps for a given amount and then returns true.

The problem is I don't know how to get access to the WebSocket from within the celery task so I can notify the UI once it's done.

celery==4.3.0
channels==2.2.0
Django==2.2.4
django-celery-results==1.1.2
djangorestframework==3.10.2

my tasks.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
import time

@shared_task
def gotosleep(timeInSecs):
    time.sleep(timeInSecs)
    return True

My consumer.py

from channels.generic.websocket import WebsocketConsumer
import json
from access.tasks import gotosleep

class AccessConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()


    def disconnect(self, close_code):
        pass

    def receive(self, text_data):
       text_data_json = json.loads(text_data)
       message = text_data_json['message']
        if message.isnumeric() == True:
            print("------------------------------------------------------")
            print(message)

            gotosleep.delay(int(message))

            self.send(text_data=json.dumps({
                'message': 'We are dealing with your request'
            }))

        else:
            self.send(text_data=json.dumps({
                'message': 'Give me a number'
            }))

Any Ideas? Many Thanks


Solution

  • @normic: Yes, I was struggling with the later addition of channel layers into my projects:

    @Ken4scholars: Many thanks for the links. These prompted me to find what I was looking for.

    For anyone else struggling:

    my tasks.py:

    from __future__ import absolute_import, unicode_literals
    from celery import shared_task
    import time
    
    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync
    
    
    @shared_task
    def add(x, y):
        return x + y
    
    
    @shared_task
    def go_to_sleep_and_add(x,y):
        time.sleep(10)
        result = int(x)+int(y)
        channel_layer = get_channel_layer()
        async_to_sync(channel_layer.group_send)(
            'task_group_a',
            {
                    'type': 'task_message',
                    'message': result
            }
        )
        return result
    
    
    @shared_task
    def mul(x, y):
        return x * y
    
    
    @shared_task
    def xsum(numbers):
        return sum(numbers)
    

    my consumers.py:

    from channels.generic.websocket import WebsocketConsumer
    import json
    from access.tasks import go_to_sleep_and_add    
    from channels.layers import get_channel_layer
    from asgiref.sync import async_to_sync
    
    class AccessConsumer(WebsocketConsumer):
        def connect(self):
            self.accept()
            self.room_group_name = "task_group_a"
    
            # Join room group
            async_to_sync(self.channel_layer.group_add)(
                self.room_group_name,
                self.channel_name
            )
    
        def disconnect(self, close_code):
            async_to_sync(self.channel_layer.group_discard)(
                self.room_group_name,
                self.channel_name
            )
    
        def receive(self, text_data):
            text_data_json = json.loads(text_data)
            num1 = text_data_json['message']['1']
            num2 = text_data_json['message']['2']
            if num1.isnumeric() and  num2.isnumeric()== True:
    
                go_to_sleep_and_add.delay(num1,num2)
                self.send(text_data=json.dumps({
                    'message': 'We are dealing with your request'
                }))
    
            else:
                self.send(text_data=json.dumps({
                    'message': 'Give me numbers'
                }))
    
        # Receive message from room group  
        def task_message(self, event):
            message = event['message']
    
            # Send message to WebSocket
            self.send(text_data=json.dumps({
                'message': message
            }))
    

    my html page in Django/templates:

    <!-- access/templates/access/room.html -->
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8"/>
        <title>Access Room</title>
    </head>
    <body>
        <textarea id="access-log" cols="50" rows="5">Results &#13;&#10;</textarea><br/>
        Number 1:<input id="access-message-input" type="text" size="20"/><br/>
        Number 2:<input id="access-message-input2" type="text" size="20"/><br/>
        <input id="access-message-submit" type="button" value="Send"/>
    </body>
    <script>
        var roomName = {{ room_name_json }};
        var accessSocket = new WebSocket(
            'ws://' + window.location.host +
            '/ws/access/' + roomName + '/');
        accessSocket.onmessage = function(e) {
            var data = JSON.parse(e.data);
            var message = data['message'];
            document.querySelector('#access-log').value += (message + '\n');
        };
    
        accessSocket.onclose = function(e) {
            console.error('Access socket closed unexpectedly');
        };
    
        document.querySelector('#access-message-submit').onclick = function(e) {
            var messageInputDom = document.querySelector('#access-message-input');
            var messageInputDom2 = document.querySelector('#access-message-input2');
            COMPLETE = { '1': messageInputDom.value, '2': messageInputDom2.value}
            accessSocket.send(JSON.stringify({
                'message': COMPLETE
            }));
    
            messageInputDom.value = '';
            messageInputDom2.value = '';
    
        };
    
    </script>
    
    </html>