Search code examples
pythonsqldjangotransactionsdjango-database

Django: how to wrap a bulk update/insert operation in transaction?


This is my use case:

  • I have multiple celery tasks that run in parallel
  • Each task could Bulk create or update many objects. For this I'm using django-bulk

So basically I'm using a very convenient function insert_or_update_many:

  1. it first performs a Select
  2. if it finds objects it updates them
  3. Otherwise it creates them

But this introduces problems of concurrency. For example: if an object did not exist during the step 1 then it is added to a list of objects to be inserted later. But during this period can happen that another Celery task has created that object and when it tries to perform a bulk insert (step 3) I get an error of duplicate Entry.

I guess I need to wrap the 3 steps in a 'blocking' block. I've read around about Transactions and I've tried to wrap the step 1,2,3 within a with transaction.commit_on_success: block

with transaction.commit_on_success():
    cursor.execute(sql, parameters)
    existing = set(cursor.fetchall())
    if not skip_update:
        # Find the objects that need to be updated
        update_objects = [o for (o, k) in object_keys if k in existing]
        _update_many(model, update_objects, keys=keys, using=using)
    # Find the objects that need to be inserted.
    insert_objects = [o for (o, k) in object_keys if k not in existing]
    # Filter out any duplicates in the insertion
    filtered_objects = _filter_objects(con, insert_objects, key_fields)
    _insert_many(model, filtered_objects, using=using)

But this does not work for me. I'm not sure I've got a full understanding of the transactions. I basically need a block where I can put several operations being sure no other process or thread is accessing (in write) my db resources.


Solution

  • I basically need a block where I can put several operations being sure no other process or thread is accessing (in write) my db resources.

    Django transactions will not, in general, guarantee that for you. If you're coming from other areas of computer science you naturally think of a transaction as blocking in this way, but in the database world there are different kinds of locks, at different isolation levels, and they vary for each database. So to ensure that your transactions do this you're going to have to learn about transactions, about locks and their performance characteristics, and about the mechanisms supplied by your database for controlling them.

    However, having a bunch of processes all trying to lock the table in order to carry out competing inserts does not sound like a good idea. If collisions were rare you could do a form of optimistic locking and just retry the transaction if it fails. Or perhaps you can direct all of these celery tasks to a single process (there's no performance advantage to parallelizing this if you're going to acquire a table lock anyway).

    My suggestion would be to start out by forgetting the bulk operations and just do one row at a time using Django's update_or_create. As long as your database has constraints that prevent duplicate entries (which it sounds like it does), this should be free of the race conditions you describe above. If the performance really does turn out to be unacceptable, then look into more complex options.

    Taking the optimistic concurrency approach means that rather than preventing conflicts—by acquiring a table lock, say—you just proceed as normal and then retry the operation if there turns out to be a problem. In your case it might look something like:

    while True:
        try:
            with transaction.atomic():
                # do your bulk insert / update operation
        except IntegrityError:
            pass
        else:
            break
    

    So if you run into your race condition, the resulting IntegrityError will cause the transaction.atomic() block to roll back any changes that have been made, and the while loop will force a retry of the transaction (where presumably the bulk operation will now see the newly-existing row and mark it for updating rather than insertion).

    This kind of approach can work really well if collisions are rare, and really badly if they are frequent.