Search code examples
asynchronouswebsocketcelerydjango-channelspython-asyncio

Using a synchronous celery callback in an async django channels websocket


In our django-channels async consumer, i'm trying to retrieve a group result. This result may or may not be complete, and therefor i would like to add a callback to pipe results as they become ready. The problem is, the websocket is async, the callback must be sync, and self.send method must to async. Initially i assumed being able to just wrap the async send in async_to_sync, though it gives the error You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly., and therefor it seems that going async->sync->async isn't allowed.

class ImageConsumer(AsyncWebsocketConsumer):

    async def celery_retrieve_group_result(self, batch_id):
        group = group = celery.GroupResult().restore(batch_id)
        res = group.get(callback=self.sync_callback)

    def sync_callback(self, task_id, value):
        async_to_sync(self.send_group_result)(task_id, value)

    async def send_group_result(self, task_id , value):
        await self.send(json.dumps({"type": "result.redraw_border", "result": value}))

I have tried different combinations, though my limiting factor is that the callback must be sync, and everything else is Async. Does anyone have experience mixing celery with async django in this manner? Any help would be appreciated!


Solution

  • Decided to go with a different approach of iterating over the enclosed AsyncResults.

    class ImageConsumer(AsyncWebsocketConsumer):
        name = "Image Consumer"
        timeout = 20
        sleep_duration = 5e-2
    
        async def celery_retrieve_group_result(self, batch_id):
    
            group = celery.GroupResult().restore(batch_id)
            if group is not None:
                ordered_list_of_async_results = group.results
                for result in ordered_list_of_async_results:
                    start = time.time()
                    try:
                        while not result.ready():
                            await asyncio.sleep(self.sleep_duration)
                            if self.timeout and time.time() - start > self.timeout:
                                logger.exception(str(TimeoutError))
                                raise TimeoutError
                        value = result.get()
                        await self.send(json.dumps({'type': 'result.processing', 'result': value}))
                    except:
                        await self.send(json.dumps({'type': 'result.processing', 'error': str(TimeoutError)}))
                group.forget()
            else:
                await self.send(json.dumps({'type': 'response.nonexistant_group', 'error': 'No result batch exists. re-upload'}))
    

    This is working really well. The timeout is because Celery 4.3 currently has an issue with Redis backends, which if removed will cause a gridlock. So until that is fixed, this is working perfectly.