Search code examples
pythonpython-3.xmultithreadingmultiprocessingmultiprocess

Stucking of python multiprocessing.pool


i have multiprocessing script with "pool.imap_unordered".

I ran into a problem when the script got stuck but i check CPU usage — nothing happening (by "top" command on ubuntu).

Going into the screen session, I see a process stuck on execution.

As I understand it, I do not use the problematic fork() method.

Also when i am running script on small amount of returning data freezes do not occur (But they occur on relatively small amounts - table < 5 MB in csv).

Can someone suggest what exactly can help? semaphore, lock or something else... I tried more processes with less data - didn't help. it may be easier to change to python parallel...


import multiprocessing

def my_func(df):

   # modify df here
   # df = df.head(1)
   return df

if __name__ == "__main__":
    df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
    with multiprocessing.Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:
        groups = (g for _, g in df.groupby("a"))
        print(df)
        print(groups)
        out = []
        for res in pool.imap_unordered(my_func, groups):
            out.append(res)
    final_df = pd.concat(out)

also i tried

import multiprocessing

def my_func(df):

   # modify df here
   # df = df.head(1)
   return df

if __name__ == "__main__":
    df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
    with multiprocessing.get_context("spawn").Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:
        groups = (g for _, g in df.groupby("a"))
        print(df)
        print(groups)
        out = []
        for res in pool.imap_unordered(my_func, groups):
            out.append(res)
    final_df = pd.concat(out)

Solution

  • You say your code doesn't work for big data. I don't have much to go on by that description but I will work on the assumption that lack of memory is the cause of your problem. If so, how large is your dataframe and how much memory do you have for running user application? There may be no solution other than getting more memory. But before you do that, we can ask: What can be done to limit memory utilization in your current processing? Will it be enough so that the program now runs? The following is a rather lengthy analysis of the situation and I propose a couple of changes you could make that might help assuming my analysis is correct.

    One of the issues using method imap_unordered is that you have little control over how many tasks initially get submitted to the pool's task queue and continue to be submitted as tasks complete. If you had (1) a large dataframe and (2) a large number of tasks that need to be submitted and (3) the entire dataframe is being passed to your worker function for each task, then that dataframe would initially be replicated on the task queue a number of time for which you have very little control and you could quickly run out of memory. The solution then would be to somehow limit how many tasks can be sitting on the task queue waiting to be processed at any point in time.

    But that is not exactly your situation because you are not passing to my_func the entire df dataframe for each submitted task but rather a subset. Even if all subsets created by the groupby method were sitting on the task queue together, the storage requirements of the task queue would approximately equal to the size of the entire dataframe. Then as these groups are processed and a results returned, the storage taken up by the task queue would be decreasing as the storage required for your out list would be increasing. The total storage requirements would probably not be changing all that much as the tasks are being processed. That is, the task queue will be decreasing in size as out grows in size. But when you create final_df you simultaneously have storage requirements for df, out, and final_df. So you currently need to be able to hold in memory essentially 3 instances of your dataframe.

    The simplest thing you can do is to delete the initial df dataframe before you perform the concatenation of your out list. Will this resolve your memory problem? I don't know but now we only need enough memory to hold two copies of your large dataframe.

    The additional thing we could do is to control the rate at which tasks are queued up and thus limit the storage required by the task queue. The code below shows how we can do this by replacing method imap_unordered with apply_async specifying a callback. We use a multiprocessing.BoundedSemaphore initialized to N where N is the maximum number of tasks we want queued up at any time. Before apply_async can be called to submit the next task, the main process must first acquire the semaphore. It will be able to do this N times without blocking. As tasks complete our callback function does a release on the semaphore allowing a new task to be submitted. But even with this change you would still need adequate storage to be able to hold two copies of your dataframe and this change will probably not help. But if your actual my_func is returning something smaller than the passed df, then this should help. If your code now works, you can simply comment out the calls semaphore.acquire() semaphore.release() to remove this change and see if it still continues to work.

    import multiprocessing
    import pandas as pd
    
    def my_func(df):
    
       # modify df here
       # df = df.head(1)
       return df
    
    if __name__ == "__main__":
        # Use number of cores or possibly a smaller number if
        # we still have memory issues:
        POOL_SIZE = multiprocessing.cpu_count()
    
        # So that when a task completes there is always another task
        # on the input queue ready to run. If memory is still an issue,
        # then set SEMAPORE_SIZE = POOL_SIZE
        SEMAPHORE_SIZE = 2 * POOL_SIZE
    
        semaphore = multiprocessing.BoundedSemaphore(SEMAPHORE_SIZE)
        out = []
    
        def my_callback(result_df):
            out.append(result_df)
            # Allow another task to be submitted:
            semaphore.release()
    
        df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})
        pool = multiprocessing.Pool(processes=POOL_SIZE)
        for _, group in df.groupby("a"):
            semaphore.acquire()
            pool.apply_async(my_func, args=(group,), callback=my_callback)
        # Wait for all tasks to complete:
        pool.close()
        pool.join()
    
        del df # reclaim storage we do not need any more
        final_df = pd.concat(out)
        print(final_df)
    

    Prints:

       a  b  c
    2  1  6  6
    3  1  4  4
    0  2  4  4
    1  2  5  5
    4  3  5  5
    5  3  6  6