Search code examples
djangopython-2.7celerydjango-celery

Celery chain tasks


Context:

tasks.py

def chunkify(lst,n):
    return [ lst[i::n] for i in xrange(n) ]

@task
def swarm_restart(procs):
   chunks = chunkify(procs, 4)
    res = chain(
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[0] ] )),
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[1] ] )),
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[2] ] )),
        group_restart.s(( [ (proc.name, proc.host.name) for proc in chunks[3] ] )),
    )()

@ task
def group_restart(procs):
    # this task runs only once, seems to be called just 1 time
    res = group( proc_restart.s(proc[0], proc[1]) for proc in procs ).apply_async()

@ task
def proc_restart(proc_name, hostname):
    # This method works, tested several times
    proc = Host.objects.get(name=hostname).get_proc(proc_name)
    proc.restart()

views.py

def call():
    procs = get_procs()
    tasks.swarm_restart.delay(procs)

The error I'm getting: TypeError: group_restart() takes exactly 1 argument (2 given)

I am doing something wrong, any lights ?

BTW. celery==3.0.25, django-celery==3.0.23


Solution

  • If you look at your swarm_restart task, you are chaining group_restart tasks. Here, the first task in the chain will execute fine but the the second task will throw error.

    TypeError: group_restart() takes exactly 1 argument (2 given)

    Because, the result of first task is passed as an argument to it. The same happens with the next tasks in the chain also.

    For example,

    from celery import task, chain
    
    @app.task
    def t1():
        return 't1'
    
    @app.task
    def t2():
       return 't2'
    
    wrong_chain = chain( t1.s(), t2.s() )
    

    If you execute wrong_chain it throughs the similar error even though you are not passing any arguments to t2

    So, you have to change your work flow depending what you have to do.