I have a simple code that runs a GET request per each item in the generator that I'm trying to speed up:
def stream(self, records):
# type(records) = <type 'generator'>
for record in records:
# record = OrderedDict([('_time', '1518287568'), ('data', '5552267792')])
output = rest_api_lookup(record[self.input_field])
record.update(output)
yield record
Right now this runs on a single thread and takes forever since each REST call waits until the previous REST call finishes.
I have used multithreading in Python from a list before using this great answer (https://stackoverflow.com/a/28463266/1150923), but I'm not sure how to re-use the same strategy on a generator instead of a list.
I had some advise from a fellow developer who recommended me that I break out the generator into 100-element lists and then close the pool, but I don't know how to create these lists from the generator.
I also need to keep the original order since I need to yield record
in the right order.
I assume you don't want to turn your generator records
into a list first. One way to speed up your processing is to pass the records into a ThreadPoolExecutor
chunk-wise. The executor will process your rest_api_lookup
concurrently for all items of the chunk. Then you just need to "unchunk" your results. Here's some running sample code (which does not use classes, sorry, but I hope it shows the principle):
from concurrent.futures import ThreadPoolExecutor
from time import sleep
pool = ThreadPoolExecutor(8) # 8 threads, adjust to taste and # of cores
def records():
# simulates records generator
for i in range(100):
yield {'a': i}
def rest_api_lookup(a):
# simulates REST call :)
sleep(0.1)
return {'b': -a}
def stream(records):
def update_fun(record):
output = rest_api_lookup(record['a'])
record.update(output)
return record
chunk = []
for record in records:
# submit update_fun(record) into pool, keep resulting Future
chunk.append(pool.submit(update_fun, record))
if len(chunk) == 8:
yield chunk
chunk = []
if chunk:
yield chunk
def unchunk(chunk_gen):
"""Flattens a generator of Future chunks into a generator of Future results."""
for chunk in chunk_gen:
for f in chunk:
yield f.result() # get result from Future
# Now iterate over all results in same order as generated by records()
for result in unchunk(stream(records())):
print(result)
HTH!
Update: I added a sleep
to the simulated REST call, to make it more realistic. This chunked version finishes on my machine in 1.5 seconds. The sequential version takes 10 seconds (as is to be expected, 100 * 0.1s = 10s).