Search code examples
pythonmultiprocessingpython-itertoolsdask

Parallel Permutation in a DataFrame (pandas or dask)


I need to calculate all possible permutations of row differences by column in a pandas dataframe.

Using itertools permutation works but for the size problem I need to solve it takes far too long. Getting an error when using multiprocessing. Assuming the error has a solution, is "multiprocessing" an optimal way or would dask have a way to solve the problem of scale?

#My naive approach

import pandas as pd
import numpy as np
from itertools import permutations

columns = list(range(1,50))
index = list(range(1,10))
df = pd.DataFrame(index= index, columns = columns,data=np.random.randn(len(index),len(columns)))
count_perm = list(permutations(df.index,2))

comparison_df = pd.DataFrame(columns = df.columns)

for a,b in permutations(df.index,2):
    comparison_df.loc['({} {})'.format(a,b)] = df.loc[a] - df.loc[b]   

#My multiprocessing attempt

import pandas as pd
import numpy as np
from itertools import permutations
from multiprocessing.dummy import Pool as ThreadPool

columns = list(range(1,5000))
index = list(range(1,100))
df = pd.DataFrame(index= index, columns = columns,data=np.random.randn(len(index),len(columns)))
count_perm = list(permutations(df.index,2))

pool = ThreadPool(4)  # Number of threads

comparison_df = pd.DataFrame(columns = df.columns)
aux_val = [(a, b) for a,b in permutations(df.index,2)]

def op(tupx):
    comparison_df.loc["('{}', '{}')".format(tupx[0],tupx[1])]  = (df.loc[tupx[0]] - df.loc[tupx[1]])

pool.map(op, aux_val)

Error:

Traceback (most recent call last):

  File "<ipython-input-69-20c917ebefd7>", line 30, in <module>
    pool.map(op, aux_val)

  File "/home/justaguy/anaconda3/lib/python3.7/multiprocessing/pool.py", line 268, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()

  File "/home/justaguy/anaconda3/lib/python3.7/multiprocessing/pool.py", line 657, in get
    raise self._value

  File "/home/justaguy/anaconda3/lib/python3.7/multiprocessing/pool.py", line 121, in worker
    result = (True, func(*args, **kwds))

  File "/home/justaguy/anaconda3/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))

  File "<ipython-input-69-20c917ebefd7>", line 26, in op
    comparison_df.loc["('{}', '{}')".format(tupx[0],tupx[1])]  = (df.loc[tupx[0]] - df.loc[tupx[1]])

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/indexing.py", line 190, in __setitem__
    self._setitem_with_indexer(indexer, value)

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/indexing.py", line 451, in _setitem_with_indexer
    self.obj._data = self.obj.append(value)._data

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/frame.py", line 6692, in append
    sort=sort)

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/reshape/concat.py", line 229, in concat
    return op.get_result()

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/reshape/concat.py", line 426, in get_result
    copy=self.copy)

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/internals/managers.py", line 2065, in concatenate_block_managers
    return BlockManager(blocks, axes)

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/internals/managers.py", line 114, in __init__
    self._verify_integrity()

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/internals/managers.py", line 311, in _verify_integrity
    construction_error(tot_items, block.shape[1:], self.axes)

  File "/home/justaguy/anaconda3/lib/python3.7/site-packages/pandas/core/internals/managers.py", line 1691, in construction_error
    passed, implied))

ValueError: Shape of passed values is (604, 4999), indices imply (602, 4999)

Solution

  • As I suggested you in the comment you might think to use combinations instead of permutations. Doing so you can cut in half your computation. Disclaimer: My code is calculating the differences of columns instead of indices as in your example.

    import pandas as pd
    import numpy as np
    from itertools import permutations, combinations
    import os 
    import multiprocessing as mp
    
    # generate data
    columns = list(range(1,50))
    
    ## I don't think you should start index at 1
    index = list(range(1,10))
    
    df = pd.DataFrame(index=index,
                      columns=columns,
                      data=np.random.randn(len(index),len(columns)))
    

    Single Thread

    %%timeit -n 10
    df1 = pd.DataFrame()
    for a,b in permutations(df.index,2):
        df1["{}-{}".format(a,b)] = df[a]-df[b]
    # 37.1 ms ± 726 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
    
    %%timeit -n 10
    df1 = pd.DataFrame()
    for a,b in permutations(df.index,2):
        df1["{}-{}".format(a,b)] = df[a].values-df[b].values
    
    df1.index = df1.index+1
    # 25.6 ms ± 1.2 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
    

    Single Thread - Using Combination

    %%timeit -n 10
    df1 = pd.DataFrame()
    for a,b in combinations(df.index,2):
        df1["{}-{}".format(a,b)] = df[a]-df[b]
    # 18.6 ms ± 1.07 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
    
    %%timeit -n 10
    df1 = pd.DataFrame()
    for a,b in combinations(df.index,2):
        df1["{}-{}".format(a,b)] = df[a].values-df[b].values
    
    df1.index = df1.index+1
    # 13.2 ms ± 819 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
    

    Multiprocessing

    This is not going to be faster in this case but you might consider it for other applications.

    def parallelize(fun, vec, cores):
        with mp.Pool(cores) as p:
            res = p.map(fun, vec)
        return res
    
    def fun(v):
        a,b=v
        cols = ["{}-{}".format(a,b)]
        df_out = pd.DataFrame(data=df[a].values-df[b].values,
                              columns=cols)
    
        return df_out
    
    vec = [(a,b) for a,b in permutations(df.index,2)]
    cores = os.cpu_count()
    
    %%timeit -n 10
    df1 = parallelize(fun, vec, cores)
    df1 = pd.concat(df1, axis=1)
    # 260 ms ± 10.7 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)