Search code examples
pythonfilterpython-multiprocessinglazy-evaluation

python filter + multiprocessing + iterator lazy loading


I have a 2 dimensional array which produces a huge (>300GB) list of combinations, so i'd like to do lazy iteration on the iterator produced by itertools.combinations and parallelize this operation. The problem is that I need to filter the output and this isn't supported by Multiprocessing. My existing workaround for this requires loading the combinations list into memory, which also doesn't work because of the size of the list.


n_nodes = np.random.randn(10, 100)
cutoff=0.3

def node_combinations(nodes):
    return itertools.combinations(list(range(len(nodes))), 2)    

def pfilter(func, candidates):
    return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])

def pearsonr(xy: tuple):
    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
    if correlation_coefficient >= cutoff:
            return True
        else:
            return False


edgelist = pfilter(pearsonr, node_combinations(n_nodes))

I'm looking for a way to do lazy evaluation of a large iterator using multiprocessing with filter instead of map.


Solution

  • The following uses a Semaphore to slow down the over eager pool thread. Not the proper solution as it doesn't fix the other issues such as that nested loops that use the same pool and loop over the result of imap have their outer loop's jobs finish before any of the inner loops jobs even get to start. But it does limit the memory usage:

    def slowdown(n=16):
        s = threading.Semaphore(n)
        def inner(it):
            for item in it:
                s.acquire()
                yield item
        def outer(it):
            for item in it:
                s.release()
                yield item
        return outer, inner
    

    This is used to wrap pool.imap as such:

    outer, inner = slowdown()
    outer(pool.imap(func, inner(candidates)))