Search code examples
pythonpandasmultiprocessing

What is the most efficient way to multiprocess over a very large dataframe?


I have a large Dataframe that I need to do a lot of matching operations over, and in the past have always used the below method for doing it. However, the Dataframe that I am currently attempting to multiprocess is a 2 GB CSV file that my computer is having problems multiprocessing over, even with only one partition. I am assuming it is because when the Dataframe is split into chunks for multiprocessing, it is than doubling the amount of memory needed, and therefore my computer cannot handle it. This is my current code:

def parallelize_dataframe(df, func, additional_param, num_partitions):
    df_split = np.array_split(df, num_partitions)
    results = []
    with ProcessPoolExecutor(max_workers=num_partitions) as executor:
        futures = {executor.submit(func, chunk, additional_param): chunk for chunk in df_split}
        for future in tqdm(futures, total=len(futures), desc="Overall progress"):
            results.append(future.result())
    return pd.concat(results)

Any help is greatly appreciated.


Solution

  • for tasks such as this I would suggest pre-processing the csv file to separate it into approximately equal chunks which are read in by the child process rather than being read by the main process and sent to the child process. The sending of that data from main to child takes quite a bit of overhead (and memory). Here's an example:

    from multiprocessing import Pool
    from io import BytesIO
    import pandas as pd
    
    csvfile = r"c:\some\example\data.csv"
    
    chunksize = 2**20  # 1MiB chunk size (try different values depending on file size and processing speed)
    
    #example csv contents
    # colA,colB,colC
    # 1,2,3
    # 4,5,6
    # 7,8,9
    # ...
    
    def sum_cols(args):
        file_start, file_end, col_names = args  # unpack tuple args as Pool.imap_unordered only supports a single arg
        with open(csvfile, "rb") as f:
            f.seek(file_start)
            buf = BytesIO(f.read(file_end-file_start))  # store chunk of csv in a buffer to pass to pandas
        df = pd.read_csv(buf, names=col_names)  # col_names aren't in the chunk so pass them explicitly
        return df.sum()
            
    if __name__ == "__main__":
        with open(csvfile, "rb") as f:
            firstline = f.readline()
            headers = [col_title.strip() for col_title in firstline.split(b",")]
            startpoints = []
            endpoints = []
            while True:  # scan the file without reading in much data to find good chunk boundaries (start and end on newline)
                startpoints.append(f.tell())
                f.seek(chunksize,1)  # skip ahead by chunksize bytes
                line = f.readline()  # find the end of the current line
                endpoints.append(f.tell())
                if not line:  # empty line indicates end of the file
                    if startpoints[-1] == endpoints[-1]: #if the last chunk landed exactly on the last line of the file, there could be an empty chunk
                        startpoints.pop()
                        endpoints.pop()
                    break
        
        arglist = [(start, end, headers) for start, end in zip(startpoints, endpoints)]
        with Pool() as pool:
            print(sum(pool.imap(sum_cols, arglist)))
    

    I have not used polars and can't make an answer based on that, but my understanding is that it is very fast.