Search code examples
pythonmongodbcelerypymongocelery-task

How to correctly connect to mongodb in Celery worker?


Learning Celery, read Celery Best Practices, and have quite a simple question about database usage with Celery.

Deni Bertovic says:

You shouldn't pass Database objects (for instance your User model) to a background task because the serialized object might contain stale data.

So, if I want to connect to the database in a worker, what is the right choice:

@app.task
def add(x, y, collection):
    client = MongoClient('mongodb://localhost:27017/')
    db = client.wakawaka
    db[collection].insert_one({'sum':x+y})
    return True

or:

client = MongoClient('mongodb://localhost:27017/')
db = client.wakawaka

@app.task
def add(x, y, collection):
    db[collection].insert_one({'sum':x+y})
    return True

?

UPD: I can close() my mongodb connection at the end of every task, so every time I need something, task will connect to fresh DB, and no resources wasted. Still, do I need to open/close database connection so many times? Or I can connect once and somehow refresh connection to retrieve fresh version of DB?


Solution

  • Update: 2024.05.28

    To reuse connection across tasks without running the risk of it being pickled, declare the connection in a separate module and import it in the module where you declare your celery tasks.

    mongo_client.py

    from pymongo import MongoClient
    
    client = MongoClient('mongodb://localhost:27017')
    db = client.db
    

    Then import the connection object in the module where you declare the celery tasks.

    from mongo_client import db
    
    @app.task
    def add(x, y, collection_name):
        collection = db[collection_name]
        collection.insert_one({'sum': x + y})
        return True
    

    Opening/closing database connection per transaction saves you from bugs resulting from stale or incorrect data as transactions are carried out independently. Also lifetime management for database transaction is simplified.

    You can write your transactions within the connection context manager block. This handles closing the connection so that it's not necessary to explicitly close the connection. It is also threadsafe. You can also take advantage of the built-in connection pooling to retry when an exception arises.

    @app.task
    def add(x, y, collection):
        with MongoClient('mongodb://localhost:27017') as connection:
            db = connection.db
            db.collection.insert_one({'sum':x+y})
        return True
    

    Note that this doesn't maintain the transactional integrity of updates to your data if you're updating multiple documents.

    Since Mongodb 4.0.0 and pymongo 3.7, they started to support multi document ACID transactions.