I'm trying to use multiprocessing in my code to make it efficient. My algorithm goes like this:
I'm continuously getting live data from a website. When 1 seconds get completed, I'm putting my collected data of 1 second in a dictionary, in a multiprocessing queue. Then I want to use this data (which is in a dictionary) in queue in some computations and print the results of the computation. The code I tried to write goes some thing like this:
import multiprocessing as mp
import datetime as dt
Q = mp.Queue()
def function_to_get_from_q(Queue):
while not Queue.empty():
#some computations on data in Queue
print(result_of_computation)
#keep collecting the data from live stream, when 1 second is completed put data
#in queue using Q.put(data) and keep getting the live data.
#then
if __name__ == "__main__":
process1 = mp.Process(target=function_to_get_from_q, args=(Queue,))
process1.start()
When I run the code, printing of result_of_computation
is not happening. The program just keep getting the live data.
I want to print the results as soon as 1st data get collected in Queue
and want to keep the process of getting live data in parallel. How should I approach?
You have a few issues with your code. The main ones are:
multiprocessing.Queue.empty
is not reliable and should not be used. Moreover, even if it were reliable the queue would be initially empty and would also be empty after function_to_get_from_q
takes an item off the queue and before whatever thread is putting data on the queue puts the next value, which only occurs once every second.get
.See the following, heavily commented code for the changes you should make:
import multiprocessing as mp
import datetime as dt
def function_to_get_from_q(queue):
# Method empty on a multiprocessing queue is not reliable and so we use
# a special sentinel value:
while True:
data = queue.get()
if data is None:
break
#some computations on data in Queue
print(result_of_computation)
# Keep collecting the data from live stream, when 1 second is completed put data
# in queue using q.put(data) and keep getting the live data.
# We will put the special value of None on the queue if we wish to signal
# to the child process that there will be no more data to be read and it
# should therefore terminate.
# Then:
if __name__ == "__main__":
# Move definition of q to here, since it should not be re-created in child processes.
# If we did not put this queue creation within this block, then when
# this code is run on platforms that create new processes with the spawn method
# A new queue would be created in the child process but never used:
q = mp.Queue()
process1 = mp.Process(target=function_to_get_from_q, args=(q,))
process1.start()
# Wait for all work to be completed.
# process1 will terminate when it gets the sentinel value None
# as data from the queue.
process1.join()