I have a process that reads from a sensor and another that reads from the graph. However, the grapher can be slower than the sensor reading, especially when many sensors are added on.
I see two options to pass the information from the sensor to the grapher: pipes and mp.Value. Pipes from what I know should be faster, but I worry about the issue where the grapher starts to delay: if the sensor sample n times as fast as the grapher, then with every grapher time step, we are only progressing 1/n timesteps in the future (e.g. if twice as fast, after 20s the grapher only has displayed 10s). I could see the sensor polling the pipe and removing all values before adding a new value, but that sounds expensive comptutationally. The mp.Value route does require more explicit locking and I believe isn't as fast as the Pipe class, although I don't know for sure.
What would be the best way to approach this multiprocessing to avoid issues here?
Edit for clarification: I don't care if the grapher gets all the information. Using the most recent value is fine, which is why the title says "Pipe only Last Value". The main requirement of the grapher is just to have the plot not get delayed, even if we effectively downsample by throwing away data. The sensor does need to sample faster than the grapher reads though as the data is also being recorded and processed, and we don't want to downsample that information.
To get the most up-to-date sensor value, you actually need the sensor process to wait until the grapher is ready to send data. There are several ways to do this, but I think actually using two unidirectional (duplex=False
) pipes is the best way to go, because you don't need to involve any extra threads or semaphores. In this setup, the first pipe sends data sensor->grapher as normal but the second is simply signals that the grapher is ready to immediately accept data. It's a little awkward to express in prose so here is pseudo code:
def grapher():
while True:
data = pipe_to_grapher.recv()
graph(data)
pipe_to_sensor.send(None) # Can be any value
def sensor():
while True:
data = sense()
if pipe_to_sensor.poll():
pipe_to_grapher.send(data) # Freshest possible
pipe_to_sensor.recv() # Clear the pipe
record(data)
Note that the sensor can simply pass right on by if poll()
returns False
, as it is an indication that the grapher is not ready for data yet. You can also easily extend the system to use special values to communicate something about the state of one process to the other, such as a shutdown command.
(Pre-edit answer follows)
This question appears to be asking about applying backpressure to your sensor data flow. It sounds like a multiprocessing.Queue
might be a good solution for your specific case. Internally it uses a pipe, so it will have similar performance characteristics, but it also has specifically the method put()
that can be used with a maxsize
parameter that you can set to a low number like 1, so that the sensor process will wait until the grapher process has retrieved an item before going back to acquire more data.
If the sensor has its own buffer that needs clearing, you can use put_nowait()
instead and catch the Full
error as an indication that the grapher won't be able to plot that data and it should be discarded. This saves the overhead of pickling and sending the data, but can lead to very rapidly polling the sensor which may be a source of overhead itself, depending on the device/drivers/api.