Search code examples
mysqldjangodjango-orm

Trying to understand Django select_for_update() with MySQL and Processes


I have a django application that is backed by a MySQL database. I have recently moved a section of code out of the request flow and put it into a Process. The code uses select_for_update() to lock affected rows in the DB but now I am occasionally seeing the Process updating a record while it should be locked in the main Thread. If I switch my Executor from a ProcessPoolExecutor to a ThreadPoolExecutor the locking works as expected. I thought that select_for_update() operated at the database level so it shouldn't make any difference whether code is in Threads, Processes, or even on another machine - what am I missing?

I've boiled my code down to a sample that exhibits the same behaviour:

from concurrent import futures
import logging
from time import sleep
from django.db import transaction
from myapp.main.models import CompoundBase

logger = logging.getLogger()
executor = futures.ProcessPoolExecutor()
# executor = futures.ThreadPoolExecutor()


def test() -> None:
    pk = setup()

    f1 = executor.submit(select_and_sleep, pk)
    f2 = executor.submit(sleep_and_update, pk)

    futures.wait([f1, f2])

def setup() -> int:
    cb = CompoundBase.objects.first()
    cb.corporate_id = 'foo'
    cb.save()

    return cb.pk

def select_and_sleep(pk: int) -> None:
    try:
        with transaction.atomic():
            cb = CompoundBase.objects.select_for_update().get(pk=pk)
            print('Locking')
            sleep(5)
            cb.corporate_id = 'baz'
            cb.save()
            print('Updated after sleep')
    except Exception:
        logger.exception('select_and_sleep')

def sleep_and_update(pk: int) -> None:
    try:
        sleep(2)
        print('Updating')
        with transaction.atomic():
            cb = CompoundBase.objects.select_for_update().get(pk=pk)
            cb.corporate_id = 'bar'
            cb.save()
            print('Updated without sleep')
    except Exception:
        logger.exception('sleep_and_update')

test()

When run as shown I get:

Locking
Updating
Updated without sleep
Updated after sleep

But if I change to the ThreadPoolExecutor I get:

Locking
Updating
Updated after sleep
Updated without sleep

Solution

  • The good news is that it's mostly there, I did some reading around and based on an answer I found here

    I am assuming that you are running on Linux as that seems to be the behaviour on the platform.

    It looks like under Linux the default Process start strategy is the fork strategy, which is usually what you want, however in this exact circumstance it appears that resources (such as DB connections) are being shared, resulting in the db operations being treated as the same transaction and thus are not blocked. To get the behaviour you want, each process would appear to need its own resources and to not share resouces with its parent process (and subsequently any other children of the parent).

    It is possible to get the behaviour you want using the following code, however be aware that I had to split the code into two files.

    fn.py

    from time import sleep
    
    from django.db import transaction
    import django
    
    django.setup()
    
    from myapp.main.models import CompoundBase
    
    
    def setup() -> int:
        cb = CompoundBase.objects.first()
        cb.corporate_id = 'foo'
        cb.save()
    
        return cb.pk
    
    def select_and_sleep(pk: int) -> None:
        try:
            with transaction.atomic():
                cb = CompoundBase.objects.select_for_update().get(pk=pk)
                print('Locking')
                sleep(5)
                cb.corporate_id = 'baz'
                cb.save()
                print('Updated after sleep')
    
        except Exception:
            logger.exception('select_and_sleep')
    
    def sleep_and_update(pk: int) -> None:
        try:
            sleep(2)
            print('Updating')
    
            with transaction.atomic():
                cb = CompoundBase.objects.select_for_update().get(pk=pk)
                cb.corporate_id = 'bar'
                cb.save()
                print('Updated without sleep')
    
        except Exception:
            logger.exception('sleep_and_update')
    

    proc_test.py

    from concurrent import futures
    from multiprocessing import get_context
    from time import sleep
    import logging
    
    import fn
    
    logger = logging.getLogger()
    executor = futures.ProcessPoolExecutor(mp_context=get_context("forkserver"))
    # executor = futures.ThreadPoolExecutor()
    
    
    def test() -> None:
        pk = fn.setup()
    
        f1 = executor.submit(fn.select_and_sleep, pk)
        f2 = executor.submit(fn.sleep_and_update, pk)
    
        futures.wait([f1, f2])
    
    test()
    

    There are three strategies in starting a process, fork, spawn, and forkserver, using either spawn or forkserver appears to get you the behaviour that you are looking for.

    References: