Search code examples
pythonmultiprocessingpython-multiprocessingpython-multithreading

Using multiprocessing to read from a queue


Here is code I use to populate and read from a queue using Python multiprocessing:

from multiprocessing import Lock, Process, Queue, Pool

import time
from random import randint

def add_to_queue(tasks_to_accomplish, name):
    while True:
        random_int = randint(0, 22)
        print('name', name , "adding" , random_int)
        tasks_to_accomplish.put(random_int)
        time.sleep(2)

def read_from_queue(tasks_to_accomplish, name):
    while True:
        item = tasks_to_accomplish.get()
        print('name' , name , item)

        time.sleep(.01)


if __name__ == '__main__':
    tasks_to_accomplish = Queue()

    p = Process(target=add_to_queue, args=(tasks_to_accomplish, "p"))
    p.start()

    p2 = Process(target=read_from_queue, args=(tasks_to_accomplish, "p2"))
    p2.start()
    p3 = Process(target=read_from_queue, args=(tasks_to_accomplish, "p3"))
    p3.start()

    p.join()
    p2.join()
    p3.join()

The code will execute inifinitely, here is partial output:

name p adding 3
name p2 3
name p adding 4
name p3 4
name p adding 0
name p2 0
name p adding 22
name p3 22
name p adding 2
name p2 2
name p adding 13
name p3 13
name p adding 0
name p2 0
name p adding 14
name p3 14
name p adding 20
name p2 20
name p adding 4
name p3 4

Reading from the queue the time taken is .01 seconds : time.sleep(.01). But the p2 & p3 processes do not appear to be reading the thread in .01 seconds as it's obvious they block for more than .01 seconds. Have I implemented the process threads correctly for reading from the queue ?


Solution

  • As Daniel pointed out, Queue.get() will block until data is available by default.

    You can use q.get(block=True) to change that although this will raise an exception:

    name p adding 12
    name p2 12
    Process Process-6:
    Traceback (most recent call last):
      File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
        self.run()
      File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
      File "<ipython-input-2-4e6d57c64980>", line 15, in read_from_queue
        item = tasks_to_accomplish.get(block=False)
      File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/queues.py", line 110, in get
        raise Empty
    _queue.Empty
    Process Process-5:
    Traceback (most recent call last):
      File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 313, in _bootstrap
        self.run()
      File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/process.py", line 108, in run
        self._target(*self._args, **self._kwargs)
      File "<ipython-input-2-4e6d57c64980>", line 15, in read_from_queue
        item = tasks_to_accomplish.get(block=False)
      File "/home/user/.pyenv/versions/3.8.0/lib/python3.8/multiprocessing/queues.py", line 110, in get
        raise Empty
    _queue.Empty
    name p adding 2
    name p adding 12
    name p adding 14
    name p adding 21
    name p adding 9
    name p adding 13
    

    You would need to:

    def read_from_queue(tasks_to_accomplish, name):
        while True:
            try:
                item = tasks_to_accomplish.get(block=False)
            except:
                print('no data for', name)
            else:
                print('name' , name , item)
        
            time.sleep(.01)
    

    to get:

    name p adding 0
    name p2 0
    no data for p3
    no data for p3
    no data for p2
    no data for p2
    no data for p3
    no data for p2
    no data for p3
    # about 350 more entries like this
    name p adding 5
    no data for p2
    name p3 5
    no data for p2
    no data for p3
    no data for p3
    no data for p2
    no data for p3
    # ...
    

    Unless you need to do some work in between reads I would say that yes, you've implemented correctly the reading processes (and you can safely remove the calls to sleep when reading).