Search code examples
pythonmultiprocessing

concurrent.futures.Executor.map: calculate function inputs lazily


I have some code similar to the following:

def worker(l:list):
   ...
really_big_list = [......]
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future = {executor.submit(worker, l[i:i+100]) for i in range(0, len(really_big_list), 10))
    for future in concurrent.futures.as_completed(future_to_url):
        data = future.result()

Note that the workers take slices of size 100 but it only goes up by 10 at a time. In this code, even though I am only using 5 workers, to calculate the inputs to each worker I'll need to use 10 times the size of really_big_list. This is simply too much memory for what I am doing. Is there a way to defer calculating l[i:100] until that worker is started?


Solution

  • I suggest an alternative - use multiprocessing.Pool and function .imap or .imap_unordered, e.g.:

    from itertools import islice
    from multiprocessing import Pool
    from time import sleep
    
    
    def worker(l: list):
        sleep(1)
        return sum(l)
    
    
    # from https://docs.python.org/3.11/library/itertools.html
    def batched(iterable, n):
        "Batch data into tuples of length n. The last batch may be shorter."
        # batched('ABCDEFG', 3) --> ABC DEF G
        if n < 1:
            raise ValueError("n must be at least one")
        it = iter(iterable)
        while batch := tuple(islice(it, n)):
            yield batch
    
    
    if __name__ == "__main__":
        really_big_list = list(range(10_000))
        with Pool(5) as pool:
            # use `.imap` or `.imap_unordered` here
            for result in pool.imap(worker, batched(really_big_list, 10)):
                print(result)