Search code examples
pythonmultithreadingmultiprocessingqueuelocking

How to make a Python threaded program (with Locks) to run on multi-process?


I have a multi-threaded program, and I want to let the user choose how to run them, either in serial, multi threads or multi cores, at least at the top level. A runnable demo is shown below illustrating my program’s logic.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from queue import Queue
# from multiprocessing import Queue


class Handler:
    def __init__(self):
        self.queue = Queue()  # this object is essential

    def put(self, item):
        self.queue.put(item)

    def run(self):
        while True:
            item = self.queue.get()
            # do other things on the item ...
            print(item)


class Runner:
    def __init__(self, name):
        self.name = name
        self.a = Handler()
        self.b = Handler()

    def start(self):
        self.a.put(f'{self.name}: hello a')
        self.b.put(f'{self.name}: hello b')
        with ThreadPoolExecutor() as exe:
            futures = [exe.submit(r.run) for r in [self.a, self.b]]
        for future in futures:
            future.result()


# current implementation
def run_in_multi_thread():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    with ThreadPoolExecutor() as exe:
        futures = [exe.submit(r.start) for r in [rA, rB, rC]]
        for future in futures:
            future.result()


# how to implement this?
def run_in_multi_process():
    rA = Runner('A')
    rB = Runner('B')
    rC = Runner('C')
    with ProcessPoolExecutor() as exe:
        futures = [exe.submit(r.start) for r in [rA, rB, rC]]
        for future in futures:
            future.result()


if __name__ == '__main__':
    # run_in_multi_thread()  # this is currently running fine
    run_in_multi_process()  # how to make this work as well?

My goal is simple, I want to put many Runners into separate processes to run in true parallel.

Problem is, when I try to change the outermost ThreadPoolExecutor to ProcessPoolExecutor, python always raises TypeError: cannot pickle '_thread.lock' object.

After googling, I know that this is because I used queue.Queue in all my Handlers, which uses threading.Lock, which is a non-pickleable class. However, I cannot avoid using them because the core functionalities are all supported by queue.Queue and threading.Event for all Handlers to communicate.

I have also tried to replace queue.Queue with multiprocessing.Queue, but this time it raises RuntimeError: Queue objects should only be shared between processes through inheritance. I also heard of third party libs, such as dill or pathos, but it causes other pickling issues, so I end up sticking to the built-in libs.

Any suggestion on how to refactor my code is welcomed.


Solution

  • After much trial and error, I finally came up with a working solution. I think I can also answer my own question. I have given up on using ProcessPoolExecutor. Right now I use multiprocessing.Process directly. Below is the revised demo code describing how this can work in my use case.

    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from multiprocessing import Process
    from queue import Queue
    from time import sleep
    
    
    class Handler:
        def __init__(self):
            self.queue = Queue()  # this object is essential
    
        def put(self, item):
            self.queue.put(item)
    
        def run(self):
            while True:
                item = self.queue.get()
                if item == 'exit':
                    break
                # do other things on the item ...
                print(item)
                sleep(1)
    
    
    class Runner:
        def __init__(self, name):
            self.name = name
            self.a = Handler()
            self.b = Handler()
    
        def start(self):
            # some dummy messages
            for _ in range(3):
                self.a.put(f'{self.name}: hello a')
                self.b.put(f'{self.name}: hello b')
            # request to shutdown gracefully
            self.a.put('exit')
            self.b.put('exit')
            with ThreadPoolExecutor() as exe:
                futures = [exe.submit(r.run) for r in [self.a, self.b]]
                for f in futures:
                    f.result()
    
    
    # this requires everything to be picklable
    def run_in_process_pool():
        rA = Runner('A')
        rB = Runner('B')
        rC = Runner('C')
        with ProcessPoolExecutor() as exe:
            futures = [exe.submit(r.start) for r in [rA, rB, rC]]
            for future in futures:
                future.result()
    
    
    # this does not pickle anything, but why?
    def run_in_processes():
        rA = Runner('A')
        rB = Runner('B')
        rC = Runner('C')
        procs = [Process(target=r.start) for r in [rA, rB, rC]]
        for p in procs:
            p.start()
        for p in procs:
            p.join()
    
    
    if __name__ == '__main__':
        # run_in_process_pool()  # `TypeError: cannot pickle '_thread.lock' object`
        run_in_processes()  # this is working
    
    

    Surprisingly, this code won’t complain about classes not being pickleable anymore! Ends up I just use the old fashion way (i.e. create process -> start -> join). Total run time reduces from 4:39 to 1:05 (my cpu has 4 cores and true parallel run time reduces to around 1/4 makes perfect sense). The next time if you have some multi-threaded code, using queue.Queue or threading.Lock under the hood, you may consider wrapping them using multiprocessing.Process directly.

    However I still want to know why only ProcessPoolExecutor requires everything to be pickleable while Process does not. To my knowledge, the Process will make a system call to create a new process, and the implementation is dependent on the os. Since I am on a Windows machine running WSL2, ,so I guess I am on Unix, and the default start method should be ‘fork’, and it just clones the whole process without the need to pickle anything?