The following code below is simulating a producer and consumer model, which will collect data from a forex broker FXCM and write to a database. Each producer process will have a session based connection with the broker.
Both producer and consumer will run indefinitely until a 'poison pill' is placed into the queue, which happens at the close of business (Friday 22:00). I have omitted this part of the code as it is irrelevant to the problem. All of the examples I can find seem to spawn a process, do some work for a short period and then join()
back to the parent process. Like this one here
As mentioned above the producer will run indefinitely, and the reason for this is because it takes about 3 seconds to log in and create a session with the broker.
When running the code below you will see the Queue backlog, although this seems to be even worst when running the real code.
Not sure if it is relevant but the session is created using the python-forexconnect API, which is written in C++ and uses Boost.
The problem is the consumer is taking too long to get()
items from the queue, and I am wondering if this model is the correct way to approach this type of development.
Thank-you
Sample code
from multiprocessing import Process, Queue, cpu_count
from datetime import datetime, timedelta
import numpy as np
import time
def dummy_data(dtto):
dates = np.array(
[dtto - timedelta(days=i) for i in range(300)])
price_data = np.random.rand(len(dates),5)
return np.concatenate(
(np.vstack(dates),price_data), axis=1)
def get_bars(q2, ms, symbol, dtfm, dtto, time_frame):
stop_date = dtfm
while dtto > stop_date:
data = dummy_data(dtto)
dtfm = data[-1,0]
dtto = data[0,0]
q2.put((symbol, dtfm, dtto))
# Switch to date
dtto = dtfm
def producer(q1,q2):
# client = fx.Client(....)
client = 'broker session'
while True:
job = q1.get()
if job == None:
break
sym, dtfm, dtto, tf = job
# Get price data from broker
get_bars(q2, client, sym, dtfm, dtto, tf)
q2.put(None)
def consumer(q2):
while True:
bars = q2.get()
if bars == None:
break
print(q2.qsize(), bars[0], bars[1], bars[2]) # write to db
q1, q2 = Queue(), Queue()
# instruments = client.get_offers()
# instruments = ['GBP/USD', 'EUR,USD',...]
instruments = range(63) # 62 dummy instruments
# Places jobs into the queue for each symbol
for symbol in instruments:
q1.put((symbol,
datetime(2000,1,14,22,0),
datetime(2018,1,14,22,0),
'D1'))
# Setup producers and consumers
pp, cp = range(6), range(2)
pro = [Process(target=producer, args=(q1, q2,)) for i in pp]
con = [Process(target=consumer, args=(q2,)) for i in cp]
for p in pro: p.start()
for p in con: p.start()
# This is just here to stop this script and does not
# exist in the real version
for i in pp: q1.put(None)
for p in pro: p.join()
for p in con: p.join()
print('stopped')
Horrible performance of multiprocessing.Queue.get()
is a known problem (several questions on Stackoverflow as well, but no answers that would be generally useful).
Which sort of indicates that you should consider another model. You could see how much process creation overhead is compared to this; do not use permanently running processes at all, but launch a process as soon as you have data ready for it. When you do it like this, your subprocess will receive an in-memory copy of data when your process forks. This adds process creation overhead but removes the queue. You could at least consider this as your consumer writes to database and does not need to report anything back to the parent.
Python is a great language but it is not the best performing when it comes to parallel processing.