Search code examples
pandasnumpyapache-sparkpysparkparquet

How can I process a large parquet file from spark in numpy/pandas?


I'm posting this for pandas, numpy and spark tags because I'm not really sure the best approach to solve this problem within those three systems.

I have a large parquet file that a downstream process is having trouble opening because it exceeds the system's memory(~63gb in memory if opened at once). I was writing the file as such:

FULL_MAIN.write.mode("overwrite").parquet(PATH+"/FULL_MAIN.parquet")

but the file was too big, so I tried to do this to break the file into smaller chucks:

    split_factor = [.1,.1,.1,.1,.1,.1,.1,.1,.1,.1]
    FULL_MAIN_RDD1,FULL_MAIN_RDD2,FULL_MAIN_RDD3,FULL_MAIN_RDD4,FULL_MAIN_RDD5, FULL_MAIN_RDD6,FULL_MAIN_RDD7,FULL_MAIN_RDD8,FULL_MAIN_RDD9,FULL_MAIN_RDD10  = FULL_MAIN.randomSplit(split_factor)
FULL_MAIN_RDD1.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD1.parquet")
FULL_MAIN_RDD2.write.mode("overwrite").parquet(PATH+"/FULL_MAIN_RDD2.parquet")
...

The problem with this approach is there are other dataframes that I needed to keep the rows aligned to and doing this random split is making the dataframes not aligned.

So my two questions are:

  1. Is there way to split multiple dataframes by relative equal amounts when I don't have any row numbers or numeric counter for each row in my dataset?
  2. Is there a way to read parquet files in batches in pandas or numpy? This would basically solve my problem on the downstream system. I can't figure out how to open the parquet in batches(I've tried to open it in pandas and then split the rows and save each file but when I load the dataframe it crashes my system). I am not sure if it's possible without exceeding memory.

Solution

  • Parquet file format supports row groups. Install pyarrow and use row_groups when creating parquet file:

    df.to_parquet("filename.parquet", row_group_size=10000, engine="pyarrow")
    

    Then you can read group-by-group (or even only specific group):

    import pyarrow.parquet as pq
    
    pq_file = pq.ParquetFile("filename.parquet")
    n_groups = pq_file.num_row_groups
    for grp_idx in range(n_groups):
        df = pq_file.read_row_group(grp_idx, use_pandas_metadata=True).to_pandas()
        process(df)
    

    If you don't have control over creation of the parquet file, you still able to read only part of the file:

    pq_file = pq.ParquetFile("filename.parquet")
    batch_size = 10000 # records
    
    batches = pq_file.iter_batches(batch_size, use_pandas_metadata=True) # batches will be a generator    
    for batch in batches:
        df = batch.to_pandas()
        process(df)