Search code examples
djangocelerygeventchannelpython-3.7

Django Channels send group message from Celery task. Asyncio event loop stopping before all async tasks finished


I'm currently stuck on a particularly tricky problem, I'll try my best to explain it.

I have a Django project and it's main purpose is to execute queued tasks from a DB rapidly. I use Celery and Celerybeat to achieve this with Django channels to update my templates with the responses in real time.

The Celery worker is a gevent worker pool with a decent number of threads.

My Task(Simplified version):

@shared_task
def exec_task(action_id):
  # execute the action
  action = Action.objects.get(pk=action_id)
  response = post_request(action)

  # update action status
  if response.status_code == 200:
    action.status = 'completed'

  else:
    action.status = 'failed'

  # save the action to the DB
  action.save()

  channel_layer = get_channel_layer()
  status_data = {'id': action.id, 'status': action.status}
  status_data = json.dumps(status_data)
  try:
    async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})
  except:
    event_loop = asyncio.get_running_loop()
    future = asyncio.run_coroutine_threadsafe(channel_layer.group_send('channel_group', {'type': 'propergate_status', 'data': status_data}), event_loop)
    result = future.result()

My Error:

[2019-10-03 18:47:59,990: WARNING/MainProcess] actions queued: 25

[2019-10-03 18:48:02,206: WARNING/MainProcess] c:\users\jack\documents\github\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: RuntimeWarning: coroutine 'AsyncToSync.main_wrap' was never awaited
self._read_event = io_class(fileno, 1)

RuntimeWarning: Enable tracemalloc to get the object allocation traceback

[2019-10-03 18:48:02,212: WARNING/MainProcess] c:\users\jack\documents\github\mcr-admin\venv\lib\site-packages\gevent_socket3.py:123: RuntimeWarning: coroutine 'BaseEventLoop.shutdown_asyncgens' was never awaited self._read_event = io_class(fileno, 1) RuntimeWarning:

Originally after I saved the action to the DB I just called:

async_to_sync(channel_layer.group_send)('channel_group', {'type': 'propergate_status', 'data': status_data})

But I kept getting a runtime error because you can't use async_to_sync if there is already an asyncio event loop already running, as shown here at line 61. So I had multiple gevent threads trying to async_to_sync very close together, constantly throwing the error in the link.

Which led me to this wonderful answer and the current version of the exec_task which has a 98% success rate in messaging the Django Channels group but I really need it to be 100%.

The problem here is that occasionally the asyncio event loop is stopped before the Coroutine I add has a chance to finish and I've been tweaking my code, playing around with the asyncio and event loop api but I either break my code or get worse results. I have a feeling it might be to do with the Asgiref async_to_sync function closing the loop early but it's complex and I only started working with python async a couple of days ago.

Any feedback, comments, tips or fixes are most welcome!

Cheers.


Solution

  • In the end I couldn't solve the problem and choose an alternative solution using a Channels AsyncHttpConsumer to send the group message. It's not optimal but it works and keeps the workflow in the Channels library.

    Consumer:

    class celeryMessageConsumer(AsyncHttpConsumer):
    
    async def handle(self, body):
        # send response
        await self.send_response(200, b"Recieved Loud and Clear", headers=[
            (b"Content-Type", b"text/plain"),
        ])
        # formating url encoded string into json
        body_data = urllib.parse.unquote_plus(body.decode("utf-8"))
        body_data = json.loads(body_data)
        id = body_data['data']['id']
    
        await self.channel_layer.group_send(
            f"group_{id}",
            {
                'type': 'propergate.data',
                'data': body_data['data']
            }
        )
    

    Routing:

    application = ProtocolTypeRouter({
        'websocket': AuthMiddlewareStack(
            URLRouter(
                myApp.routing.websocket_urlpatterns
            )
        ),
        'http': URLRouter([
            path("celeryToTemplate/", consumers.celeryMessageConsumer),
            re_path('genericMyAppPath/.*', AsgiHandler),
        ]),
    })
    

    Http Request:

    data = json.dumps({'id': id, 'status': status})
    response = internal_post_request('http://genericAddress/celeryToTemplate/', data)
    if response.status_code == 200:
        # phew
        pass
    else:
        # whoops
        pass
    

    Requests:

    def internal_post_request(request_url, payload):
        headers={
            'Content-Type': 'application/json'
        }
        response = requests.post(request_url, data=payload, headers=headers)
        return response