Search code examples
pythonmultiprocessingoverhead-minimization

How to reduce time for multiprocessing in python


I am trying to build multiprocessing in python to reduce computation speed, but it seems like after multiprocessing, the overall speed of computation decreased significantly. I have created 4 different processes and split dataFrame into 4 different dataframe, which will be an input to each processes. After timing each process, it seems like the overhead cost is significant, and was wondering if there is way to reduce these overhead costs.

I am using windows7, python 3.5 and my machine has 8 cores.

def doSomething(args, dataPassed,):

    processing data, and calculating outputs

def parallelize_dataframe(df, nestedApply):
    df_split = np.array_split(df, 4)
    pool = multiprocessing.Pool(4)
    df = pool.map(nestedApply, df_split)
    print ('finished with Simulation')
    time = float((dt.datetime.now() - startTime).total_seconds())

    pool.close()
    pool.join()

def nestedApply(df):

    func2 = partial(doSomething, args=())
    res = df.apply(func2, axis=1)
    res =  [output Tables]
    return res

if __name__ == '__main__':

data = pd.read_sql_query(query, conn)

parallelize_dataframe(data, nestedApply)

Solution

  • I would suggest to use queues instead of providing your DataFrame as chunks. You need a lot of ressources to copy each chunk and it takes quite some time to do so. You could run out of memory if your DataFrame is really big. Using queues you could benefit from fast iterators in pandas. Here is my approach. The overhead reduces with the complexity of your workers. Unfortunately, my workers are far to simple to really show that, but sleep simulates complexity a bit.

    import pandas as pd
    import multiprocessing as mp
    import numpy as np
    import time
    
    
    def worker(in_queue, out_queue):
        for row in iter(in_queue.get, 'STOP'):
            value = (row[1] * row[2] / row[3]) + row[4]
            time.sleep(0.1)
            out_queue.put((row[0], value))
    
    if __name__ == "__main__":
        # fill a DataFrame
        df = pd.DataFrame(np.random.randn(1e5, 4), columns=list('ABCD'))
    
        in_queue = mp.Queue()
        out_queue = mp.Queue()
    
        # setup workers
        numProc = 2
        process = [mp.Process(target=worker,
                              args=(in_queue, out_queue)) for x in range(numProc)]
    
        # run processes
        for p in process:
            p.start()
    
        # iterator over rows
        it = df.itertuples()
    
        # fill queue and get data
        # code fills the queue until a new element is available in the output
        # fill blocks if no slot is available in the in_queue
        for i in range(len(df)):
            while out_queue.empty():
                # fill the queue
                try:
                    row = next(it)
                    in_queue.put((row[0], row[1], row[2], row[3], row[4]), block=True)  # row = (index, A, B, C, D) tuple
                except StopIteration:
                    break
            row_data = out_queue.get()
            df.loc[row_data[0], "Result"] = row_data[1]
    
        # signals for processes stop
        for p in process:
            in_queue.put('STOP')
    
        # wait for processes to finish
        for p in process:
            p.join()
    

    Using numProc = 2 it takes 50sec per loop, with numProc = 4 it is twice as fast.