I have a main process that is eating data from many different markets. It does some preliminary processing on the message and then passes it into a multiprocessing Queue (each unique market has its own dedicated process, call it Parse
, on the other end of the Queue). Then the main process calls mp.Event.set()
for the particular market involved.
Within Parse
is a call to mp.Event.wait()
, which pauses the process unless there is data being fed into the queue. At the end of Parse
it calls mp.Event.clear().
I do this because I am using a while True
loop to catch data the moment it comes through the queue. If I don't pause Parse
it will use 100% of the CPU and I don't have enough cores for that (not to mention it's massively wasteful).
This evening I realized that Parse
is taking WAY too long to run, from .3 to 18 seconds. Market data messages can come in every 12 milliseconds so clearly this is unworkable. Every aspect of Parse
is very fast, except for mp.Event.wait()
. This call accounts for almost 100% of the run time.
I am storing all the mp.Event
objects in a dictionary, defined in a config file. I fear that one of two things is happening:
Each instance of setting and clearing the Event blocks all the other ones, in a way similar to how mp.Manager
works with shared objects.
mp.Event
is just slow, and takes a long time for its state to propagate across processes...
I am thinking of solving this by piping the data with zmq
(ZeroMQ) rather than a mp.Queue
, but before I set that up I thought to ask the smart people.
Am I doing something obviously wrong here? Is there any way to speed up the mp.Event
flagging?
EDIT
In response to the comment, here is an example:
In the config.py
file, I define the dictionary like so:
E,Q={},{}
for m in all_markets:
E[m] = mp.Event()
Q[m] = mp.Queue()
Then in the main process which reads the data, I call sort
, which looks something like this:
def sort(message, m):
if message satisfies condition1:
define some args
Q[m].put(message, *args)
E[m].set()
if message satisfies condition2:
#basically the same
Then finally in Parse
, which is started upon program startup:
def Parse(message,m,Q,E):
while True:
E[m].wait()
message = Q[m].get()
#do a bunch of processing on the message
#put the results in some other queues
E[m].clear()
EDIT2
Procs are spawned and started like this:
def mitosis():
mp.Process(target=main).start()
def pstart(m,func,**kwargs):
if func=='parser':
p = mp.Process(target=parser, args=(m, Q, E, *args) )
p.start()
def main():
PROCS={}
for m in all_markets:
for procs in proclist:
PROCS[(m,proc)] = pstart(m,proc,**kwargs)
I think your problem is that your Event
code is broken.
Imagine this scenario:
sort
for m
.sort
calls Q[m].put
and E[m].set
.Parse
wakes up, does Q[m].get
, and starts processing.sort
again for the same m
.sort
calls Q[m].put
and E[m].set
.Parse
finishes processing the first message, calls E[m].clear
.Now Parse
is just waiting around for the Event
to be set again. Which may not happen for quite a while. And, even if it happens quickly, it's still not going to catch up; it only does one Q[m].get
for each Event.wait
.
So, what you end up with is Parse
appearing to fall farther and farther behind. And when you try to profile it to figure out why, you see that it's spending all its time waiting on E[m].wait
. But this isn't because E[m].wait
is slow, it's just because the event trigger got lost.
This isn't the only race condition here, it's just the most obvious one.
The general problem is that you can't use event objects this way. Normally, you solve it by using a Condition
instead, or one-shot triggering and self-resetting Event
s, plus looping over the Q[m].get(block=False)
after each Event
.
But really, there is no need to do this in the first place. If you just remove the Event
entirely, when Parse
calls Q[m].get
, that blocks until there's something there. So, when sort
calls Q[m].put
, that wakes up Parse
, and there's nothing else needed.
In fact, the whole point of Queue
is that it's inherently self-synchronized. If you don't want that, use a Pipe
, and then you can use a Condition
for signaling. But in the simple case, that's just a less efficient version of a Queue
.