I feel like I am using multiprocessing queue incorrectly. I want some function to generate data and populate a queue (or other class object) and then have another process always read in the latest version of that data.
So far, I've conceptualized using last-in-first-out (LIFO) but that isn't available in multiprocessing. And it would also remove the latest data, which I don't want. I've also thought of using a multiprocessing queue with a maxsize=1
argument and some sort of "peek" method but peek is also not available for multiprocessing.
I'm pretty new to Python so really complex solutions means I'll probably just try something else.
Example below. I want both print lines with .gets to return the latest value. If the queue has ever been populated, it would just return the latest.
from multiprocessing import Process, Queue
import time
def f(q,x):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue(maxsize=1)
p = Process(target=f, args=(q,1))
p.start()
time.sleep(0.1)
print(q.get(block=False))
print(q.get(block=False))
p.join()
Python has the multiprocessing.Manager functionality, which can do what you need there - instead of Queues, using a managed object (even a managed list) can allow you repeated access to the item at the end of the list (if you need to keep previous values) - and the machinery is already in place to avoid race-conditions that would crash your code, or make it misbehave.
https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Manager
It is as easy as, (Even on the interactive Python environment, like iPython, if you are on a system that uses "fork" to start sub-processes. On MacOS and Windows you have to run that on a script)
import multiprocessing
import time
xx = multiprocessing.Manager()
def blah(seq):
seq.append(42)
seq = xx.list() # managed list: can be manipulated in all processes!
yy = multiprocessing.Process(target=blah, args=(seq,))
yy.start()
time.sleep(0.3)
print(seq[-1])
# yy.join()
# xx.shutdown()
The only remark is that managed objects have to be passed around in arguments in calls to sub-processes - so they are correctly serialized and reconstructed in a working state upon de-serializing. That means: one can't simply call os.fork
and expect the managed list to work in the child process, for example.