Search code examples
pythonmultithreadingparallel-processingiteratorgenerator

How to zip iterators in parallel using threading?


Say I have N generators that produce a stream of items gs = [..] # list of generators.

I can easily zip them together to get a generator of tuples from each respective generator in gs: tuple_gen = zip(*gs).

This calls next(g) on each g in sequence in gs and gathers the results in a tuple. But if each item is costly to produce we may want to parallelize the work of next(g) on multiple threads.

How can I implement a pzip(..) that does this?


Solution

  • What you asked for can be achieved by creating a generator which yields the results from apply_async-calls on a ThreadPool.

    FYI, I benchmarked this approach with pandas.read_csv-iterators you get with specifying the chunksize parameter. I created eight copies of a 1M rows sized csv-file and specified chunksize=100_000.

    Four of the files were read with the sequential method you provided, four with the mt_gen function below, using a pool of four threads:

    • single threaded ~ 3.68 s
    • multi-threaded ~ 1.21 s

    Doesn't mean it will improve results for every hardware and data-setup, though.

    import time
    import threading
    from multiprocessing.dummy import Pool  # dummy uses threads
    
    
    def _load_sim(x = 10e6):
        for _ in range(int(x)):
            x -= 1
        time.sleep(1)
    
    
    def gen(start, stop):
        for i in range(start, stop):
            _load_sim()
            print(f'{threading.current_thread().name} yielding {i}')
            yield i
    
    
    def multi_threaded(gens):
        combi_g = mt_gen(gens)
        for item in combi_g:
            print(item)
    
    
    def mt_gen(gens):
        with Pool(N_WORKERS) as pool:
            while True:
                async_results = [pool.apply_async(next, args=(g,)) for g in gens]
                try:
                    results = [r.get() for r in async_results]
                except StopIteration:  # needed for Python 3.7+, PEP 479, bpo-32670
                    return
                yield results
    
    
    if __name__ == '__main__':
    
        N_GENS = 10
        N_WORKERS = 4
        GEN_LENGTH = 3
    
        gens = [gen(x * GEN_LENGTH, (x + 1) * GEN_LENGTH) for x in range(N_GENS)]
        multi_threaded(gens)
    

    Output:

    Thread-1 yielding 0
    Thread-2 yielding 3
    Thread-4 yielding 6
    Thread-3 yielding 9
    Thread-1 yielding 12
    Thread-2 yielding 15
    Thread-4 yielding 18
    Thread-3 yielding 21
    Thread-1 yielding 24
    Thread-2 yielding 27
    [0, 3, 6, 9, 12, 15, 18, 21, 24, 27]
    Thread-3 yielding 7
    Thread-1 yielding 10
    Thread-2 yielding 4
    Thread-4 yielding 1
    Thread-3 yielding 13
    Thread-1 yielding 16
    Thread-4 yielding 22
    Thread-2 yielding 19
    Thread-3 yielding 25
    Thread-1 yielding 28
    [1, 4, 7, 10, 13, 16, 19, 22, 25, 28]
    Thread-1 yielding 8
    Thread-4 yielding 2
    Thread-3 yielding 11
    Thread-2 yielding 5
    Thread-1 yielding 14
    Thread-4 yielding 17
    Thread-3 yielding 20
    Thread-2 yielding 23
    Thread-1 yielding 26
    Thread-4 yielding 29
    [2, 5, 8, 11, 14, 17, 20, 23, 26, 29]
    
    Process finished with exit code 0