Search code examples
python-3.xprocessmultiprocessingthread-safety

How can I safely access a variable between Python processes which use ProcessPoolExecutor?


I have a script that uses multiprocessing to execute many io-bound tasks. I want to access a variable between processes in a safe manner. Is there a simple way to do this does not involve low level logic like manipulating locks?

import concurrent.futures
import time
import random

def do_book_task(books, year):
    books.append(year)

    print(f'Doing task with books from {year}.')
    time.sleep(random.random() + 0.5)

    books.remove(year)

    return f'Result for {year}'

def main():
    years = ['1996', '1997', '1998', '1999', '2000', '2001']

    books = [] # I want this variable to be process-safe

    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        futures = []
        for year in years:
            futures.append(executor.submit(do_book_task, books, year))
            print(f'Submitted {year} to process queue')
        
        for future in concurrent.futures.as_completed(futures):
            try:
                year = years[futures.index(future)]
                print(f'Done {year}')
                print(future.result())
            except Exception as e:
                print(f'Error with year: {year}')
                print(e)

Solution

  • Yes, you can use the Manager class from multiprocessing.managers. See https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Manager.

    from multiprocessing import Manager
    
    def main():
        years = ['1996', '1997', '1998', '1999', '2000', '2001']
    
        with Manager() as manager:
            books = manager.list() # creates a proxy object which is process-safe
            # the rest of the code is the same
    
            with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
                futures = []
                for year in years:
                    futures.append(executor.submit(do_book_task, books, year))
                    print(f'Submitted {year} to process queue')
                
                for future in concurrent.futures.as_completed(futures):
                    try:
                        year = years[futures.index(future)]
                        print(f'Done {year}')
                        print(future.result())
                    except Exception as e:
                        print(f'Error with year: {year}')
                        print(e)