Search code examples
pythonmultiprocessingqueuepython-multiprocessing

Using multiprocessing and trying to use data in queue and print result from that data as soon as data get in multiprocessing queue


I'm trying to use multiprocessing in my code to make it efficient. My algorithm goes like this:

I'm continuously getting live data from a website. When 1 seconds get completed, I'm putting my collected data of 1 second in a dictionary, in a multiprocessing queue. Then I want to use this data (which is in a dictionary) in queue in some computations and print the results of the computation. The code I tried to write goes some thing like this:

import multiprocessing as mp
import datetime as dt
Q = mp.Queue()

def function_to_get_from_q(Queue):
    while not Queue.empty():
      #some computations on data in Queue
      print(result_of_computation)

#keep collecting the data from live stream, when 1 second is completed put data 
#in queue using Q.put(data) and keep getting the live data. 
#then

if __name__ == "__main__":
 process1 = mp.Process(target=function_to_get_from_q, args=(Queue,))
 process1.start()

When I run the code, printing of result_of_computation is not happening. The program just keep getting the live data.

I want to print the results as soon as 1st data get collected in Queue and want to keep the process of getting live data in parallel. How should I approach?


Solution

  • You have a few issues with your code. The main ones are:

    1. Method multiprocessing.Queue.empty is not reliable and should not be used. Moreover, even if it were reliable the queue would be initially empty and would also be empty after function_to_get_from_q takes an item off the queue and before whatever thread is putting data on the queue puts the next value, which only occurs once every second.
    2. You have no code that ultimately actually takes data off the queue using method get.

    See the following, heavily commented code for the changes you should make:

    import multiprocessing as mp
    import datetime as dt
    
    def function_to_get_from_q(queue):
        # Method empty on a multiprocessing queue is not reliable and so we use
        # a special sentinel value:
        while True:
            data = queue.get()
            if data is None:
                break
            #some computations on data in Queue
            print(result_of_computation)
    
    # Keep collecting the data from live stream, when 1 second is completed put data 
    # in queue using q.put(data) and keep getting the live data.
    # We will put the special value of None on the queue if we wish to signal
    # to the child process that there will be no more data to be read and it
    # should therefore terminate.
    # Then:
    
    if __name__ == "__main__":
        # Move definition of q to here, since it should not be re-created in child processes.
        # If we did not put this queue creation within this block, then when
        # this code is run on platforms that create new processes with the spawn method
        # A new queue would be created in the child process but never used:
        q = mp.Queue()
        process1 = mp.Process(target=function_to_get_from_q, args=(q,))
        process1.start()
        # Wait for all work to be completed.
        # process1 will terminate when it gets the sentinel value None
        # as data from the queue.
        process1.join()