Search code examples
pythoncelery

Celery worker_ready.connect signal not working properly


I have a FastApi Mongo Celery app, Using Beanie as my odm and creating a new thread that runs an event loop for async tasks inside the celery worker.

The issue is that any ressources initilization i am putting inside the @worker_ready.connect signal is not available in the global scope of the application

Code :

@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
    """Initializing ressources needed for each worker process
    """
    logger.info('Startup celery workers **STARTED**')
    # start the aio thread that holds the event loop
    construct_aio_threading(BaseTask.aio_thread)
    # initiliazing database models
    async_to_sync(aio_thread=BaseTask.aio_thread,coroutine=db_session)
    # print(BaseTask.aio_thread)
    logger.info('Startup celery workers **FINISHED**')

Normally this should initialize the the database models and make them available in the global scope of the worker app process, but when accessing the database is says that the database models hasn't been initialized.

When accessing the id attribute of my User database models :

pp_sample_celery_worker | user = User.find()

app_sample_celery_worker | raise CollectionWasNotInitialized app_sample_celery_worker | beanie.exceptions.CollectionWasNotInitialized

Or when checking the status of the thread is says that it is stopped

app_sample_celery_worker | [2022-10-20 10:40:27,369: WARNING/ForkPoolWorker-8] <AioThread(Thread-1, stopped)>.

PS : inside the signal function i have access to the db session, this means that the connection is available locally inside the function and when checking the thread it says its alive, but neither the db session nor the thread are available outside (at the module level or inside celery tasks).

Logs when accessing the thread and the User classe inside the signal function :

app_sample_celery_worker | [2022-10-20 10:45:05,889: WARNING/MainProcess] <AioThread(Thread-1, started 140177104766720)> app_sample_celery_worker | [2022-10-20 10:45:05,889: WARNING/MainProcess] <class 'app_sample.database.models.User'>

is it that everything i initialize inside the signal is garbage collected ?


Solution

  • After long investigations, it turns out that the celery worker signal is run only inside one process (The master process), i answered similar question to one of my old questions here

    TLDR :

    1. the Signals do not run on all spawned processes by prefork
    2. In case you're working with cpu bound tasks use custom Task classes to initialize ressources and cache them inside each worker (db connection, global data variables ...)
    3. i am now using gevent executor pool which works just fine with multi-threading
    4. using threading with Celery prefork executor pool will eventually result in you loosing your mind