Search code examples
pythoninheritancepython-multiprocessing

Python multiprocessing Queue child class losing attributes in process


I am trying to implement a child class from the multiprocessing Queue in python. The child class contains a simple Boolean flag "ready". When I send the queue to a new process, the ready attribute is disappearing. The following code demonstrates the problem:

import multiprocessing
import multiprocessing.queues


class ReadyQueue(multiprocessing.queues.Queue):
    def __init__(self, ctx, *args, **kwargs):
        super(ReadyQueue, self).__init__(ctx=ctx, *args, **kwargs)
        self.ready = False


def ready_queue(*args, **kwargs):
    return ReadyQueue(ctx=multiprocessing.get_context(), *args, **kwargs)


def foo(q):
    print(q.ready)


if __name__ == "__main__":
    my_queue = ready_queue()
    print(my_queue.ready)
    p = multiprocessing.Process(target=foo, args=(my_queue,))
    p.start()
    p.join()

With the output:

False
Process Process-1:
Traceback (most recent call last):
  File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "C:\Users\acre018\Anaconda3\envs\EIT_Qt\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\acre018\github\EIT_Qt\Experiments\ready_queue_test.py", line 16, in foo
    print(q.ready)
AttributeError: 'ReadyQueue' object has no attribute 'ready'

Solution

  • I implemented this workaround:

    import multiprocessing
    from queue import Empty
    import time
    import ctypes
    
    
    class ReadyQueue:
        def __init__(self, *args, **kwargs):
            self.queue = multiprocessing.Queue(*args, **kwargs)
            self._ready = multiprocessing.Value(ctypes.c_bool, False)
    
        def set_ready(self):
            self._ready.value = True
    
        def set_not_ready(self):
            self._ready.value = False
            self.clear()
    
        def is_ready(self):
            return self._ready.value
    
        def clear(self):
            try:
                while True:
                    self.queue.get(block=False)
            except Empty:
                pass
    
        def get(self, block=True, timeout=None):
            return self.queue.get(block, timeout)
    
        def put(self, obj, block=True, timeout=None):
            return self.queue.put(obj, block, timeout)
    
        def full(self):
            return self.queue.full()
    
        def empty(self):
            return self.queue.empty()
    
        def qsize(self):
            return self.queue.qsize()
    
    
    def foo(q):
        while q.is_ready():
            time.sleep(1)
            q.put("hello from foo")
        print("q no longer ready, foo loop finished")
    
    
    if __name__ == "__main__":
        my_queue = ReadyQueue()
        my_queue.set_ready()
        p = multiprocessing.Process(target=foo, args=(my_queue,))
        p.start()
    
        for i in range(2):
            print(my_queue.get())
            time.sleep(2)
    
        print("my_queue._ready = %s, qsize: %d. Setting not ready.." % (str(my_queue.is_ready()), my_queue.qsize()))
        my_queue.set_not_ready()
        print("my_queue._ready = %s, qusize: %d" % (str(my_queue.is_ready()), my_queue.qsize()))
    

    With the output:

    C:\Users\acre018\Anaconda3\envs\test_pyqt\python.exe C:/Users/acre018/github/EIT_Qt/Experiments/ready_queue_test2.py
    hello from foo
    hello from foo
    my_queue._ready = True, qsize: 2. Setting not ready..
    my_queue._ready = False, qusize: 0
    q no longer ready, foo loop finished
    
    Process finished with exit code 0
    

    The workaround is to have my ReadyQueue class not inherit from multiprocessing.queues.Queue but have a queue as an attribute. For convenience I implemented the methods that I need from queue, and they just pass through to the queue attribute. I also implemented a clear method.

    Note that in my first example I neglected to make self.ready a multiprocessing.Value, so wouldn't have been able to edit it across processes, but I tested after fixing that and it was not the source of the issue.