Search code examples

Celery add_periodic_task blocks Django running in uwsgi environment

I have written a module that dynamically adds periodic celery tasks based on a list of dictionaries in the projects settings (imported via django.conf.settings). I do that using a function add_tasks that schedules a function to be called with a specific uuid which is given in the settings:

def add_tasks(celery):
    for new_task in settings.NEW_TASKS:
            name='My Task %s' % new_task['uuid'],

Like suggested here I use the on_after_configure.connect signal to call the function in my

app = Celery('my_app')

def setup_periodic_tasks(celery, **kwargs):
    from add_tasks_module import add_tasks

This setup works fine for both celery beat and celery worker but breaks my setup where I use uwsgi to serve my django application. Uwsgi runs smoothly until the first time when the view code sends a task using celery's .delay() method. At that point it seems like celery is initialized in uwsgi but blocks forever in the above code. If I run this manually from the commandline and then interrupt when it blocks, I get the following (shortened) stack trace:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'data'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/", line 42, in __get__
    return obj.__dict__[self.__name__]
KeyError: 'tasks'

During handling of the above exception, another exception occurred:
Traceback (most recent call last):

  (SHORTENED HERE. Just contained the trace from the console through my call to this function)

  File "/opt/my_app/add_tasks_module/", line 42, in add_tasks
  File "/usr/local/lib/python3.6/site-packages/celery/", line 146, in __getattr__
    return getattr(self._get_current_object(), name)
  File "/usr/local/lib/python3.6/site-packages/celery/", line 109, in _get_current_object
    return loc(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python3.6/site-packages/celery/app/", line 72, in task_by_cons
    return app.tasks[
  File "/usr/local/lib/python3.6/site-packages/kombu/utils/", line 44, in __get__
    value = obj.__dict__[self.__name__] = self.__get(obj)
  File "/usr/local/lib/python3.6/site-packages/celery/app/", line 1228, in tasks
  File "/usr/local/lib/python3.6/site-packages/celery/app/", line 507, in finalize
    with self._finalize_mutex:

It seems like there is a problem with acquiring a mutex.

Currently I am using a workaround to detect if sys.argv[0] contains uwsgi and then not add the periodic tasks, as only beat needs the tasks, but I would like to understand what is going wrong here to solve the problem more permanently.

Could this problem have something to do with using uwsgi multi-threaded or multi-processed where one thread/process holds the mutex the other needs?

I'd appreciate any hints that can help me solve the problem. Thank you.

I am using: Django 1.11.7 and Celery 4.1.0

Edit 1

I have created a minimal setup for this problem:

import os
from celery import Celery
from django.conf import settings
from myapp.tasks import my_task

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')

app = Celery('my_app')

def setup_periodic_tasks(sender, **kwargs):

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

from celery import shared_task
def my_task():

Make sure that CELERY_TASK_ALWAYS_EAGER=False and that you have a working message queue.


./ shell -c 'from myapp.tasks import my_task; my_task.delay()'

Wait about 10 seconds before interrupting to see the above error.


  • So, I have found out that the @shared_task decorator creates the problem. I can circumvent the problem when I declare the task right in the function called by the signal like so:

    def add_tasks(celery):
        def my_task(uuid):
        for new_task in settings.NEW_TASKS:
                name='My Task %s' % new_task['uuid'],

    This solution is actually working for me, but I have one more problem with this: I use this code in a pluggable app, so I can't directly access the celery app outside of the signal handler but would like to also be able to call the my_task function from within other code. By defining it within the function it is not available outside of the function, so I cannot import it anywhere else.

    I can probably work around this by defining the task function outside of the signal function, and use it with different decorators here and in the I am wondering though if there is a decorator apart from the @shared_task decorator that I can use in the that does not create the problem.

    The current best solution could be:

    def my_task(uuid):
        # do stuff
    def add_tasks(celery):
        celery_my_task = celery.task(my_task)
        for new_task in settings.NEW_TASKS:
                name='My Task %s' % new_task['uuid'],

    from celery import shared_task
    from task_app import my_task
    shared_my_task = shared_task(my_task)

    import os
    from celery import Celery
    from django.conf import settings
    # set the default Django settings module for the 'celery' program.
    os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
    app = Celery('my_app')
    def setup_periodic_tasks(sender, **kwargs):
        from task_app import add_tasks
    app.config_from_object('django.conf:settings', namespace='CELERY')
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)