Search code examples
pythonmultithreadinglocking

How to synchronize two alternating threads


I need to start two threads, controlling which one starts first, then having them alternating their jobs.

The following code works as expected with do_sleep = True, but it can fail with do_sleep = False.

How can I achieve the same result without using those ugly (and unreliable) sleeps?

The reason why it works with do_sleep = True is that:

  • Each worker thread gives time to the other thread to start before trying to acquire the lock and start the next job
  • There is a pause between the start of the first and the second worker that allows the first one to acquire the lock before the second is ready

With do_sleep = False it can fail because:

  • At the end of each job, each thread can try to acquire the lock for the next cycle before the other thread, executing two consecutive jobs instead of alternating
  • The second thread could acquire the lock before the first one

Here is the code:

import threading
import time
import random

do_sleep = True

def workerA(lock):
    for i in range(5):
        lock.acquire()
        print('Working A - %s' % i)
        time.sleep(random.uniform(0.2, 1))
        lock.release()
        if do_sleep: time.sleep(0.1)

def workerB(lock):
    for i in range(5):
        if do_sleep: time.sleep(0.1)
        lock.acquire()
        print('Working B - %s' % i)
        time.sleep(random.uniform(0.2, 1))
        lock.release()
        if do_sleep: time.sleep(0.1)

lock = threading.Lock()

t1 = threading.Thread(target=workerA, args=(lock, ))
t2 = threading.Thread(target=workerB, args=(lock, ))

t1.start()
if do_sleep: time.sleep(0.1)
t2.start()

t1.join()
t2.join()

print('done')

EDIT Using a Queue as suggested by Mike doesn't help, because the first worker would finish the job without waiting for the second.

This is the wrong output of a version after replacing the Lock with a Queue:

Working A - 0
Working A - 1
Working B - 0
Working A - 2
Working B - 1
Working A - 3
Working B - 2
Working A - 4
Working B - 3
Working B - 4
done

This is the wrong output, obtained with do_sleep = False:

Working A - 0
Working A - 1
Working A - 2
Working A - 3
Working A - 4
Working B - 0
Working B - 1
Working B - 2
Working B - 3
Working B - 4
done

This is the correct output, obtained with do_sleep = True:

Working A - 0
Working B - 0
Working A - 1
Working B - 1
Working A - 2
Working B - 2
Working A - 3
Working B - 3
Working A - 4
Working B - 4
done

Solution

  • Several ways to solve this. One relatively easy one is to use the lock to control access to a separate shared variable: call this other variable owner, it can either be set to A or B. Thread A can only start a job when owner is set to A, and thread B can only start a job when owner is set to B. Then the pseudo-code is (assume thread A here):

    while True:
        while True:
            # Loop until I'm the owner
            lock.acquire()
            if owner == A:
                break
            lock.release()
    
        # Now I'm the owner. And I still hold the lock. Start job.
        <Grab next job (or start job or finish job, whatever is required to remove it from contention)>
        owner = B
        lock.release()
        <Finish job if not already done. Go get next one>
    

    The B thread does the same thing only reversing the if owner and owner = statements. And obviously you can parameterize it so that both actually just run the same code.

    EDIT

    Here is the working version, with the suggested logic inside an object:

    import threading
    import time
    
    def workerA(lock):
        for i in range(5):
            lock.acquire_for('A')
            print('Start A - %s' % i)
            time.sleep(0.5)
            print('End A - %s' % i)
            lock.release_to('B')
    
    def workerB(lock):
        for i in range(5):
            lock.acquire_for('B')
            print('Start B - %s' % i)
            time.sleep(2)
            print('End B - %s' % i)
            lock.release_to('A')
    
    class LockWithOwner:
    
        lock = threading.RLock()
        owner = 'A'
    
        def acquire_for(self, owner):
            n = 0
            while True:
                self.lock.acquire()
                if self.owner == owner:
                    break
                n += 1
                self.lock.release()
                time.sleep(0.001)
            print('Waited for {} to be the owner {} times'.format(owner, n))
    
        def release_to(self, new_owner):
            self.owner = new_owner
            self.lock.release()
    
    lock = LockWithOwner()
    lock.owner = 'A'
    
    t1 = threading.Thread(target=workerA, args=(lock, ))
    t2 = threading.Thread(target=workerB, args=(lock, ))
    
    t1.start()
    t2.start()
    
    t1.join()
    t2.join()
    
    print('done')