Search code examples
pythoncelerytornado

How to use Celery in Tornado with redis as broker and backend?


It raises an exception when I try to use celery3.1.11, tornado-celery0.3.5 in Tornado4.2 with yield. It works without the yield, but cannot get the result asynchronous...I also find it works when I use rabbitmq as a broker, while redis will raise the below error...

Here is my code.

from mycelery import celery_task
import tcelery
tcelery.setup_nonblocking_producer()

token = yield tornado.gen.Task(celery_task.get_rongcloud_token.apply_async,args=[3])
print token

My Celery task:

from celery import Celery, platforms
from celery.schedules import crontab
from celery.utils.log import get_task_logger
from celery.exceptions import SoftTimeLimitExceeded

platforms.C_FORCE_ROOT = True  # linux 下要root用户才不报错
broker = 'redis://:'+settings.REDIS_PASS+'@127.0.0.1:6379/5'
backend = 'redis://:'+settings.REDIS_PASS+'@127.0.0.1:6379/6'
app = Celery('tasks', broker=broker, backend=backend)

@app.task(name='mycelery.celery_task.get_rongcloud_token')
def get_rongcloud_token(user_id):
    print 'xxxxx'
    a = 'xxx'
    return a

Here is the error:

 TypeError: <function wrapper at 0x5bd2c80> is not JSON serializable

Hha,find a same question: Tornado celery can't use gen.Task or CallBack


Solution

  • The limited docs about tcelery doesn't explain much and your example doesn't show much either. From the limited docs, it does seem like you're using it as intended (unless I'm missing something) so I'm not sure why your code is not working. I've been successful with the following method:

    class CeleryTasks(web.RequestHandler):
    
        @gen.coroutine
        def get(self):
            future = concurrent.Future()
            celery_task = tasks.get_rongcloud_token.delay(3)
            check_status(celery_task, future)
            yield future
            self.write(future.result())
            #self.write(celery_task.result)
    
    
    def check_status(celery_task, future):
        """
        Check the status of the celery task and set the result in the future
        """
        if not celery_task.ready():
            ioloop.IOLoop.current().call_later(
                1,
                check_status,
                celery_task,
                future)
        else:
            future.set_result(celery_task.result)
    

    First a bare Future is created, which will yielded until the results from the celery task is available. Next execute the celery task as you would normally for async execution (eg. task_fn.delay(*args) or task_fn.apply_async(*args)). Pass the celery_task and future into a function (check_status) that will check if the task is "ready", if not, then recursively call itself at a later time and check again. Then yield the Future until the result is set. Once the task has completed and a result is available, set the Future with a result and do whatever you need to do with the result.