Search code examples
pythonmultithreadingasynchronousconcurrent.futures

Python : efficiency concerns in parallel async calls to fetch data from web services


I am writing a python script to fetch a list of hosts corresponding to a particular group_id. I will be using a web service call to fetch the same. The number of hosts can be in 10,000. Now for each host I will fetch a value called property from another web service.
so group-id ----(ws1)-----10000s of hosts --(ws2)----property for each

I am using concurrent.futures as shown in the following code. But it does not seems to be a clean design and unlikely to scale well.

def call_ws_1(group_id):
     #fetch list of hosts for group_id


def call_ws_2(host):
     #find property for host


def fetch_hosts(group_ids):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
        for future in concurrent.futures.as_completed(future_to_grp_id):
            group_id = future_to_grp_id[future]
            try:
                hosts = future.result()#this is a list
            except Exception as exp:
                #logging etc
            else:
                 fetch_property(hosts)


def fetch_property(hosts):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        future_to_host = {executor.submit(call_ws_2, host): host for host in hosts}
        for future in concurrent.futures.as_completed(future_to_host):
            host = future_to_host[future]
            try:
                host_prop = future.result()#String
            except Exception as exp:
                #logging etc
            else:
                 #Save host and property to DB
  1. Will there be any advantage using ProcessPoolExecuter ?
  2. How about fetching all the hosts first (around 40000 of them) and then calling ws to fetch property
  3. Any other suggestion to improve this design?

Solution

    1. ProcessPoolExecutor has the advantage of not being affected by the GIL. With ThreadPoolExecutor, the GIL will prevent more than one thread from running at a time, unless you're doing I/O. The good news is it looks like both of your threads will primarily be doing I/O, but any kind of processing that occurs in each thread before or after making their web services calls will not truly happen concurrently, which will hurt your performance. ProcessPoolExecutor will not have this restriction, but it has the added overhead of sending the group_id and host data between processes. If you've got tens of thousands of hosts, sending those one at a time between processes is going to have fairly significant overhead.

    2. I don't think this change alone will change performance too much, since in the end you're still sending each host, one at a time, to a thread for processing.

    As for number 3, if your worker threads are really doing almost nothing but I/O, this approach may work fine. But with threads, any CPU-bound work going in the workers is going to murder your performance. I took your exact program layout and implemented your two workers like this:

    def call_ws_1(group_id):
        return list(range(20))
    
    def call_ws_2(host):
        sum(range(33000000))  # CPU-bound
        #time.sleep(1)  # I/O-bound
        return "{} property".format(host)
    

    And executed everything like this:

    if __name__ == "__main__":
        start = time.time()
        fetch_hosts(['a', 'b', 'c', 'd', 'e'])
        end = time.time()
        print("Total time: {}".format(end-start))
    

    Using time.sleep, the output is:

    Fetching hosts for d
    Fetching hosts for a
    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for e
    Total time: 25.051292896270752
    

    Using the sum(range(33000000)) calculation, the performance is much worse:

    Fetching hosts for d
    Fetching hosts for a
    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for e
    Total time: 75.81612730026245
    

    Note that the calculation takes about one second on my laptop:

    >>> timeit.timeit("sum(range(33000000))", number=1)
    1.023313045501709
    >>> timeit.timeit("sum(range(33000000))", number=1)
    1.029937982559204
    

    So each worker takes about a second. But because one is CPU-bound, and therefore affected by the GIL, threads perform horribly.

    Here's a ProcessPoolExecutor using time.sleep:

    Fetching hosts for a
    Fetching hosts for b
    Fetching hosts for c
    Fetching hosts for d
    Fetching hosts for e
    Total time: 25.169482469558716
    

    Now using sum(range(33000000)):

    Fetching hosts for a
    Fetching hosts for b
    Fetching hosts for c
    Fetching hosts for d
    Fetching hosts for e
    Total time: 43.54587936401367
    

    As you can see, while performance is still worse than time.sleep (probably because the calculation takes a bit longer than a second, and CPU-bound work has to compete with everything else running on the laptop), it still greatly outperforms the threaded version.

    However, I suspect that as the number of hosts goes up, the cost of IPC will slow you down quite a bit. Here's how the ThreadPoolExecutor does with 10000 hosts, but a worker process that does nothing (it just returns):

    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for d
    Fetching hosts for a
    Fetching hosts for e
    Total time: 9.535644769668579
    

    Compare to ProcessPoolExecutor:

    Fetching hosts for c
    Fetching hosts for b
    Fetching hosts for a
    Fetching hosts for d
    Fetching hosts for e
    Total time: 36.59257411956787
    

    So there's its 4x slower with ProcessPoolExecutor, all caused by the cost of IPC.

    So, what's all this mean? I think your best possible performance would come by using ProcessPoolExecutor, but additionally batching your IPC, so that you send large chunks of hosts into a child process, rather than just sending one host at a time.

    Something like this (untested, but gives you the idea):

    import time
    import itertools
    import concurrent.futures
    from concurrent.futures import ProcessPoolExecutor as Pool
    
    def call_ws_1(group_id):
        return list(range(10000))
    
    def call_ws_2(hosts):  # This worker now works on a list of hosts
        host_results = []
        for host in hosts:
            host_results.append(( host, "{} property".format(host)))  # returns a list of (host, property) tuples
        return host_results
    
    def chunk_list(l):
        chunksize = len(l) // 16  # Break the list into smaller pieces
        it = [iter(l)] * chunksize
        for item in itertools.zip_longest(*it):
            yield tuple(filter(None, item))
    
    def fetch_property(hosts):
        with Pool(max_workers=4) as executor:
            futs = []
            for chunk in chunk_list(hosts):
                futs.append(concurrent.futures.submit(call_ws_2, chunk))
            for future in concurrent.futures.as_completed(futs):
                try:
                     results = future.result()
                except Exception as exp:
                    print("Got %s" % exp)
                else:
                    for result in results:
                        host, property = result
                        # Save host and property to DB
    
    def fetch_hosts(group_ids):
        with Pool(max_workers=4) as executor:
            future_to_grp_id = {executor.submit(call_ws_1, group_id): group_id for group_id in group_ids}
            for future in concurrent.futures.as_completed(future_to_grp_id):
                group_id = future_to_grp_id[future]
                try:
                    hosts = future.result()#this is a list
                except Exception as exp:
                    print("Got %s" % exp)
                else:
                    print("Fetching hosts for {}".format(group_id))
                    fetch_property(hosts)
    
    if __name__ == "__main__":
        start = time.time()
        fetch_hosts(['a', 'b', 'c', 'd', 'e'])
        end = time.time()
        print("Total time: {}".format(end-start))