Search code examples
pythonmultiprocessingprocess-pool

Using concurrent.futures with infinite iterator and stopping criteria


I am trying to parallelize a loop which uses an infinite generator as input to collect some data and stops when a certain amount of data has been received.

My implementation is something like this.

class A:
  def __iter__(self):
    i = 0
    while True:
       yield i 
       i += 1


def procpar(x):
    r = random.random()
    print('Computing x =', x)
    
    if r > 0.5
        return [2 * x]
    else:
        return [2 * x, x ** 2]


with ProcessPoolExecutor(4) as pool:
  out = []
  x = A()
  
  for res in pool.map(procpar, x):
    out.extend(res)
    if len(out) > 100:
      break

Now, when I run it, I do get this output, after which it just hangs and nothing happend.

Computing x = 1
Computing x = 6
Computing x = 2
Computing x = 3
Computing x = 4
Computing x = 5

Looking into whats going on, is that the map method is trying to unroll and generate data from the iterator x = A(), so it is stuck in an infinite loop.

Any suggestions how to avoid being stuck in infinite loop. Ofcorse, I could call the iterator x in chunks before feeding them to the process pool, but looking if someone may have better or more straightforward solution.


Solution

  • Try using multiprocessing.pool.imap instead:

    from multiprocessing import Pool
    import random
    
    
    class A:
        def __iter__(self):
            i = 0
            while True:
                yield i
                i += 1
    
    
    def procpar(x):
        r = random.random()
        print('Computing x =', x)
    
        if r > 0.5:
            return [2 * x]
        else:
            return [2 * x, x ** 2]
    
    
    # Required for Windows:
    if __name__ == '__main__':
        with Pool(4) as pool:
            out = []
            x = A()
    
            for res in pool.imap(procpar, x):
                out.extend(res)
                if len(out) > 100:
                    break
        print(out)
    

    Prints:

    Computing x = 0
    Computing x = 1
    Computing x = 2
    Computing x = 3
    Computing x = 4
    Computing x = 5
    Computing x = 6
    Computing x = 7
    Computing x = 8
    Computing x = 9
    Computing x = 10
    Computing x = 11
    Computing x = 12
    Computing x = 13
    Computing x = 14
    Computing x = 15
    Computing x = 16
    Computing x = 17
    Computing x = 18
    Computing x = 19
    Computing x = 20
    Computing x = 21
    Computing x = 22
    Computing x = 23
    Computing x = 24
    Computing x = 25
    Computing x = 26
    Computing x = 27
    Computing x = 28
    Computing x = 29
    Computing x = 30
    Computing x = 31
    Computing x = 32
    Computing x = 33
    Computing x = 34
    Computing x = 35
    Computing x = 36
    Computing x = 37
    Computing x = 38
    Computing x = 39
    Computing x = 40
    Computing x = 41
    Computing x = 42
    Computing x = 43
    Computing x = 44
    Computing x = 45
    Computing x = 46
    Computing x = 47
    Computing x = 48
    Computing x = 49
    Computing x = 50
    Computing x = 51
    Computing x = 52
    Computing x = 53
    Computing x = 54
    Computing x = 55
    Computing x = 56
    Computing x = 57
    Computing x = 58
    Computing x = 59
    Computing x = 60
    Computing x = 61
    Computing x = 62
    Computing x = 63
    Computing x = 64
    [0, 2, 1, 4, 4, 6, 8, 16, 10, 12, 14, 49, 16, 64, 18, 20, 22, 24, 144, 26, 28, 196, 30, 225, 32, 256, 34, 289, 36, 38, 361, 40, 400, 42, 441, 44, 484, 46, 529, 48, 576, 50, 625, 52, 54, 56, 58, 60, 900, 62, 961, 64, 66, 1089, 68, 1156, 70, 1225, 72, 1296, 74, 1369, 76, 78, 1521, 80, 1600, 82, 1681, 84, 1764, 86, 88, 90, 2025, 92, 2116, 94, 96, 2304, 98, 100, 102, 2601, 104, 2704, 106, 2809, 108, 110, 3025, 112, 3136, 114, 116, 3364, 118, 3481, 120, 3600, 122]