Search code examples
pythonpython-multiprocessingpoolapply-async

multiprocessing Manager fails to apply_async from a shared process.pool


I have two daemon processes running. One feeds data into a shared multiprocessing.Queue, and the other takes data from this Queue and submits it into a multiprocessing.pool.apply_async. The async result is sent to another multiprocessing.Queue.

Some example code:

import multiprocessing
from multiprocessing import Process
import random
import time


def my_method(i):
    return i*i


class DataFeeder:
    input_queue = None

    @staticmethod
    def stream_to_queue(iq):
        if DataFeeder.input_queue is None:
            DataFeeder.input_queue = iq

        while True:
            time.sleep(1)
            dat = random.choice(range(0, 50))
            print(f"feeding {dat}")
            DataFeeder.input_queue.put(dat)


class DataEater:
    input_queue = None
    results_queue = None
    pool = None

    @staticmethod
    def eat(iq, rq, p):
        if DataEater.input_queue is None:
            DataEater.input_queue = iq
        if DataEater.results_queue is None:
            DataEater.results_queue = rq
        if DataEater.pool is None:
            DataEater.pool = p

        while True:
            time.sleep(0.1)
            dat = DataEater.input_queue.get(0.1) # 100ms timeout
            print(f"eating {dat}")
            async_result = DataEater.pool.apply_async(my_method, (dat,))
            print(f"async_result {async_result}")
            DataEater.results_queue.put_nowait(async_result)


if __name__ == '__main__':
    with multiprocessing.Manager() as m:
        input_q = m.Queue()
        output_q = m.Queue()
        pool = m.Pool(8)

        dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
        dfp.start()

        dep = Process(target=DataEater.eat, args=(input_q, output_q, pool), daemon=True)
        dep.start()

        dep.join()
        dfp.join()

The resulting stacktrace is as follows:

Process Process-3:
Traceback (most recent call last):
  File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/chhunb/PycharmProjects/micromanager_server/umanager_server/multiproctest.py", line 54, in eat
    async_result = DataEater.pool.apply_async(my_method, (dat,))
  File "<string>", line 2, in apply_async
  File "/opt/anaconda3/envs/integration_tests/lib/python3.9/multiprocessing/managers.py", line 816, in _callmethod
    proxytype = self._manager._registry[token.typeid][-1]
AttributeError: 'NoneType' object has no attribute '_registry'

I've tried spawning the pool locally in the eat() method, so that it would spawn from within the new process. This won't work because the result is an async_result and has to be shared with other processes using the results_queue.

I've tried completely ignoring the DataFeeder by hard coding some string into the apply-async input. Same error.

This open python issue is very similar, if not exactly the same as what i'm doing: https://github.com/python/cpython/issues/80100 The issue is still open so does that mean it was never resolved in later versions of python?

Do i have to consider a completely different design pattern to get this functionality?


Solution

  • The following is a hackish approach which was tested only on Python 3.11 on Windows. It may break for other than the CPython implementation, other Python versions, other OS'

    Moreover it currently transfers the authentication key of the manager's server between processes (this is considered unsafe) but this could be changed if necessary with a fixed key.

    import multiprocessing
    from multiprocessing import Process
    import threading
    
    import random
    import time
    
    
    def my_method(i):
        return i*i
    
    
    class DataFeeder:
        input_queue = None
    
        @staticmethod
        def stream_to_queue(iq):
            if DataFeeder.input_queue is None:
                DataFeeder.input_queue = iq
    
            while True:
                time.sleep(1)
                dat = random.choice(range(0, 50))
                print(f"feeding {dat}")
                DataFeeder.input_queue.put(dat)
    
    
    class DataEater:
        input_queue = None
        results_queue = None
        pool = None
    
        @staticmethod
        def eat(iq, rq, p, adr, auth, registry):
            
            if DataEater.input_queue is None:
                DataEater.input_queue = iq
            if DataEater.results_queue is None:
                DataEater.results_queue = rq
            if DataEater.pool is None:
                DataEater.pool = p
    
            # Here begins the magic (aka dirty hack):
            # Constructing a new manager connected to the same server process
            # as the manager in main thread           
            from multiprocessing.managers import SyncManager
            
            manager = SyncManager(address=adr, authkey=auth)
    
            # Updating the registry:
    
            registry.update(SyncManager._registry)
            SyncManager._registry = registry
            
            manager.connect()
            
            # Setting our manager as manager of the pool proxy
            DataEater.pool._manager = manager
    
            # Continue normally
    
            while True:
                time.sleep(0.1)
                dat = DataEater.input_queue.get(0.1) # 100ms timeout
                print(f"eating {dat}")
                async_result = DataEater.pool.apply_async(my_method, (dat,))
                print(f"async_result {async_result}")
                DataEater.results_queue.put_nowait(async_result)
    
    
    # Additional function/process to wait for the asynchronous result and print it:
    def data_printer(output_q):
        while True:
            item = output_q.get()
            result = item.get()
            print(f"awaited async result {result}")
    
    
    if __name__ == '__main__':
        with multiprocessing.Manager() as m:
            input_q = m.Queue()
            output_q = m.Queue()
            pool = m.Pool(8)
            
            dpp = Process(target=data_printer, args=(output_q,), daemon=True)
            dpp.start()
    
            dfp = Process(target=DataFeeder.stream_to_queue, args=(input_q,), daemon=True)
            dfp.start()
    
            # Three additional arguments added to create a remote manager in the process
            dep = Process(target=DataEater.eat, args=(input_q, output_q, pool, m.address, m._authkey, m._registry), daemon=True)
            dep.start()
    
            dep.join()
            dfp.join()
            dpp.join()