Search code examples
pythonmultiprocessing

MultiProcessing in Python Enduring Order of Queue


I am trying to run a loess regression across multiple processors but when my code adds each concurrent dataframe to be appended to the queue they are all out of order and the resulting graph looks awful.

def smooth_data_mp(data_frame):
    num_processes = 8
    chunk_size = 125000
    fraction = 125 / chunk_size
    print(data_frame.head())
    result_queue = Manager().Queue()
    with Pool(processes=num_processes) as pool:
        pool.starmap(process_data, [(data_frame, i, chunk_size, fraction, result_queue) for i in
                                    range(len(data_frame) // chunk_size)])

    # Collect results from the queue in order
    result_list = pd.DataFrame(result_queue.get())
    while not result_queue.empty():
        result_list = pd.concat([result_list, result_queue.get()])
    return result_list

def process_data(dataframe, i, chunk_size, fraction, result_queue):
    start_frame = chunk_size * i
    end_frame = min(chunk_size * (i + 1), len(dataframe))  # Ensure end_frame doesn't exceed length of sampleData
    print(f'{start_frame}, {end_frame}') # just for debugging
    new_data_frame = calculate_loess_on_subset(dataframe[start_frame:end_frame], chunk_size, fraction, i)
    result_queue.put(new_data_frame)

How do I ensure that each dataframe added to the queue in the process_data function is added in the order of occurrence from the original dataset instead of just when the process finishes?

I have tried using different queue types like a regular queue and a manager queue but only the manager worked... but I'm not sure how to fix the problems.


Solution

  • The problem is that you are appending new dataframes to a queue as soon as those dataframes become available. But you don't need a queue, you could just use the return value from starmap:

    with Pool(processes=num_processes) as pool:
        results = pool.starmap(process_data, 
             [(data_frame, i, chunk_size, fraction) 
              for i in range(len(data_frame) // chunk_size)])
    return pd.concat(results)
    

    This will preserve the original order of the input data_frame. The process_data function should then be modified as:

    def process_data(dataframe, i, chunk_size, fraction):
        ...similar code as original...
        return new_data_frame
    

    (If for some reason you do want to keep the output results queue around too, then the easiest way to ensure the ordering is probably to just make sure you add an explicit row index to the processed frames, and at the end you then need to sort the merged frames on that index. Using a shared output queue here, I think, only makes sense if you have another thread or process that needs to do further processing on those results, but here that doesn't seem the case.) (See also the sample "Test code" in multiprocessing.managers.SyncManager)