I am using requests to subscribe to a high frequency data stream. During busy times, there is a significant latency between the timestamp of the messages being sent to me and the timestamp of when I am able to process them in my script. During quiet times, this latency is consistently around 500 ms. During busy times, it rises to over 50 seconds. Here are some more observations:
I looked at the resource usage of my machine during busy times and CPU load and memory hardly rise.
The latency during busy times begins (when I started my script) at around <1s but as the script runs, the latency increases to 50s. Therefore, this latency is not inherent to the sender of the data but to some processing going on in my script. As my script runs, the latency gets higher and higher.
Therefore, I am concluding that the problem is with my processing of the data. Here is what I am doing to process the data.
The function essentially sends dict objects back to a callback for further processing. Each dict is a JSON dict being sent by the streaming API.
def receive_stream(callback):
s = requests.Session()
with s.get(...stream=True) as resp:
buffer = ''
for line in resp.iter_lines():
line = line.decode('utf-8')
json_dict = None
if line == '}':
json_dict = buffer + line
buffer = ''
buffer = buffer + line
if json_dict is not None:
parsed_json = json.loads(json_dict)
if parsed_json['type'] != 'hb':
t = Thread(target=callback, args=(parsed_json))
Note: The callback function measures the latency over every 50 messages (takes a mean) or so and calculates it as date time.datetime.now() - the timestamp in the json dict being sent to it.
If I measure the latency in this function above AND remove the callback, it makes little difference -- same observations apply. Therefore, the downstream processing is not the issue (plus I am sending it off to another thread, so it shouldn't be)
My questions:
Is the way I am processing the incoming lines of data inherently inefficient, so that during busy times, there is a big backlog of lines that are unprocessed? Could it be the json.loads() or line.decode() <--- I have to call the latter?
Is the way I am using threads, the problem? I don't think the downstream processing is particularly costly, it just sends a message using zmq and measures latency and removing the callback altogether, makes little difference to this problem. Should I be using a queue?
You might consider some of the answers to How I can I lazily read multiple JSON values from a file/stream in Python? for alternative approaches to splitting your stream into separate JSON objects, but what you're doing now doesn't look too bad. As long as it's correct. It is not correct for general JSON (it makes unwarranted assumptions about the format of the JSON strings), but it might be fine for your particular data.
- Is the way I am using threads, the problem?
This is where I would look first for an improvement.
Before anything else, I would check whether you need (additional) threads at all. Do you get adequate performance if you just do everything in one thread?
I don't think the downstream processing is particularly costly, it just sends a message using zmq and measures latency and removing the callback altogether, makes little difference to this problem. Should I be using a queue?
If you need multiple threads then yes, you probably should be using a queue. Specifically, you should be queueing all the messages to a single thread for processing. Launching threads is expensive, and having many threads running at the same time is expensive, and you don't actually want processing of the first message to share CPU with processing the second, and so forth -- rather, you want the messages in order, as fast as you can get them.
Plus, the CPython VM is not parallel. Only one thread at a time can execute Python bytecode. You don't have to manage that, but you should be aware of the implications.
Also, you may want to experiment with what processing you do in the reader thread vs what you do in the processor thread. For example, you might dispatch the raw strings to the processor thread instead of parsing them to JSON objects first. You might even want three threads -- one that reads the stream and breaks it into strings, another that parses those into JSON objects, and a third that processes the objects.