I have a FastApi Mongo Celery app, Using Beanie as my odm and creating a new thread that runs an event loop for async tasks inside the celery worker.
The issue is that any ressources initilization i am putting inside the @worker_ready.connect signal is not available in the global scope of the application
Code :
@worker_ready.connect
def startup_celery_ecosystem(**kwargs):
"""Initializing ressources needed for each worker process
"""
logger.info('Startup celery workers **STARTED**')
# start the aio thread that holds the event loop
construct_aio_threading(BaseTask.aio_thread)
# initiliazing database models
async_to_sync(aio_thread=BaseTask.aio_thread,coroutine=db_session)
# print(BaseTask.aio_thread)
logger.info('Startup celery workers **FINISHED**')
Normally this should initialize the the database models and make them available in the global scope of the worker app process, but when accessing the database is says that the database models hasn't been initialized.
When accessing the id attribute of my User database models :
pp_sample_celery_worker | user = User.find()
app_sample_celery_worker | raise CollectionWasNotInitialized app_sample_celery_worker | beanie.exceptions.CollectionWasNotInitialized
Or when checking the status of the thread is says that it is stopped
app_sample_celery_worker | [2022-10-20 10:40:27,369: WARNING/ForkPoolWorker-8] <AioThread(Thread-1, stopped)>.
PS : inside the signal function i have access to the db session, this means that the connection is available locally inside the function and when checking the thread it says its alive, but neither the db session nor the thread are available outside (at the module level or inside celery tasks).
Logs when accessing the thread and the User classe inside the signal function :
app_sample_celery_worker | [2022-10-20 10:45:05,889: WARNING/MainProcess] <AioThread(Thread-1, started 140177104766720)> app_sample_celery_worker | [2022-10-20 10:45:05,889: WARNING/MainProcess] <class 'app_sample.database.models.User'>
is it that everything i initialize inside the signal is garbage collected ?
After long investigations, it turns out that the celery worker signal is run only inside one process (The master process), i answered similar question to one of my old questions here
TLDR :