I am writing a python script to fetch a list of hosts corresponding to a particular group_id. I will be using a web service call to fetch the same. The number of hosts can be in 10,000. Now for each host I will fetch a value called property from another web service.
so group-id ----(ws1)-----10000s of hosts --(ws2)----property for each
I am using concurrent.futures as shown in the following code. But it does not seems to be a clean design and unlikely to scale well.
def call_ws_1(group_id):
#fetch list of hosts for group_id
def call_ws_2(host):
#find property for host
def fetch_hosts(group_ids):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
for future in concurrent.futures.as_completed(future_to_grp_id):
group_id = future_to_grp_id[future]
try:
hosts = future.result()#this is a list
except Exception as exp:
#logging etc
else:
fetch_property(hosts)
def fetch_property(hosts):
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_host = {executor.submit(call_ws_2, host): host for host in hosts}
for future in concurrent.futures.as_completed(future_to_host):
host = future_to_host[future]
try:
host_prop = future.result()#String
except Exception as exp:
#logging etc
else:
#Save host and property to DB
ProcessPoolExecutor
has the advantage of not being affected by the GIL. With ThreadPoolExecutor
, the GIL will prevent more than one thread from running at a time, unless you're doing I/O. The good news is it looks like both of your threads will primarily be doing I/O, but any kind of processing that occurs in each thread before or after making their web services calls will not truly happen concurrently, which will hurt your performance. ProcessPoolExecutor
will not have this restriction, but it has the added overhead of sending the group_id
and host
data between processes. If you've got tens of thousands of hosts, sending those one at a time between processes is going to have fairly significant overhead.
I don't think this change alone will change performance too much, since in the end you're still sending each host, one at a time, to a thread for processing.
As for number 3, if your worker threads are really doing almost nothing but I/O, this approach may work fine. But with threads, any CPU-bound work going in the workers is going to murder your performance. I took your exact program layout and implemented your two workers like this:
def call_ws_1(group_id):
return list(range(20))
def call_ws_2(host):
sum(range(33000000)) # CPU-bound
#time.sleep(1) # I/O-bound
return "{} property".format(host)
And executed everything like this:
if __name__ == "__main__":
start = time.time()
fetch_hosts(['a', 'b', 'c', 'd', 'e'])
end = time.time()
print("Total time: {}".format(end-start))
Using time.sleep
, the output is:
Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 25.051292896270752
Using the sum(range(33000000))
calculation, the performance is much worse:
Fetching hosts for d
Fetching hosts for a
Fetching hosts for c
Fetching hosts for b
Fetching hosts for e
Total time: 75.81612730026245
Note that the calculation takes about one second on my laptop:
>>> timeit.timeit("sum(range(33000000))", number=1)
1.023313045501709
>>> timeit.timeit("sum(range(33000000))", number=1)
1.029937982559204
So each worker takes about a second. But because one is CPU-bound, and therefore affected by the GIL, threads perform horribly.
Here's a ProcessPoolExecutor
using time.sleep
:
Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 25.169482469558716
Now using sum(range(33000000))
:
Fetching hosts for a
Fetching hosts for b
Fetching hosts for c
Fetching hosts for d
Fetching hosts for e
Total time: 43.54587936401367
As you can see, while performance is still worse than time.sleep
(probably because the calculation takes a bit longer than a second, and CPU-bound work has to compete with everything else running on the laptop), it still greatly outperforms the threaded version.
However, I suspect that as the number of hosts goes up, the cost of IPC will slow you down quite a bit. Here's how the ThreadPoolExecutor
does with 10000 hosts, but a worker process that does nothing (it just returns):
Fetching hosts for c
Fetching hosts for b
Fetching hosts for d
Fetching hosts for a
Fetching hosts for e
Total time: 9.535644769668579
Compare to ProcessPoolExecutor
:
Fetching hosts for c
Fetching hosts for b
Fetching hosts for a
Fetching hosts for d
Fetching hosts for e
Total time: 36.59257411956787
So there's its 4x slower with ProcessPoolExecutor
, all caused by the cost of IPC.
So, what's all this mean? I think your best possible performance would come by using ProcessPoolExecutor
, but additionally batching your IPC, so that you send large chunks of hosts into a child process, rather than just sending one host at a time.
Something like this (untested, but gives you the idea):
import time
import itertools
import concurrent.futures
from concurrent.futures import ProcessPoolExecutor as Pool
def call_ws_1(group_id):
return list(range(10000))
def call_ws_2(hosts): # This worker now works on a list of hosts
host_results = []
for host in hosts:
host_results.append(( host, "{} property".format(host))) # returns a list of (host, property) tuples
return host_results
def chunk_list(l):
chunksize = len(l) // 16 # Break the list into smaller pieces
it = [iter(l)] * chunksize
for item in itertools.zip_longest(*it):
yield tuple(filter(None, item))
def fetch_property(hosts):
with Pool(max_workers=4) as executor:
futs = []
for chunk in chunk_list(hosts):
futs.append(concurrent.futures.submit(call_ws_2, chunk))
for future in concurrent.futures.as_completed(futs):
try:
results = future.result()
except Exception as exp:
print("Got %s" % exp)
else:
for result in results:
host, property = result
# Save host and property to DB
def fetch_hosts(group_ids):
with Pool(max_workers=4) as executor:
future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
for future in concurrent.futures.as_completed(future_to_grp_id):
group_id = future_to_grp_id[future]
try:
hosts = future.result()#this is a list
except Exception as exp:
print("Got %s" % exp)
else:
print("Fetching hosts for {}".format(group_id))
fetch_property(hosts)
if __name__ == "__main__":
start = time.time()
fetch_hosts(['a', 'b', 'c', 'd', 'e'])
end = time.time()
print("Total time: {}".format(end-start))