Search code examples
pythonpartitioningpython-polars

How to re-group parquet files using polars (in file storage)


I am using a dataset of multiple data frames as a makeshift data warehouse accessible to Power BI and python use cases. How can I regroup the data using polars into data frames of a particular size?

from pathlib import Path
import uuid
import polars as pl


def repartition(directory_to_repartition, target_size):
    repart_dir = Path(directory_to_repartition)
    old_paths = [v for v in repart_dir.iterdir() if v.suffix == '.parquet']
    frames = [pl.read_parquet(path) for path in old_paths]
    big_frame = pl.concat(frame) # memory usage may or may not be tolerable
    #############This method is the missing link I am looking for assuming I have sufficient memory for these operations########
    new_frames = big_frame.split_partitions(partition_size=target_size)
    for frame in new_frames:
        frame.write_parquet(repart_dir / f"{uuid.uuid4()}.parquet")
    for old in old_paths:
        try:
            old.unlink()
        except FileNotFoundError:
            pass

Note the function split_partitions is a pseudo_method of polars dataframes. it does not exist.


Solution

  • I don't think there'd be a good reason for your split_partitions method to return a list of dfs. Additionally, you probably want to use scan_parquet instead of read_parquet. Outside of the scope of this question, you probably want to look into using some kind of hive partition instead of having all the files in a flat directory.

    Another missing piece is determining how to get to a target size, one approach is to iterate through a different number of rows with estimated_size.

    Let's start with creating a lazyframe of all your source files and add a column for row count which we'll use as an index.

    frames = pl.concat([pl.scan_parquet(x) for x in old_paths]).with_row_count('i')
    

    Then we need to figure out how many rows it takes to get your target size. I'm sure there are ways to optimize this but those optimizations might be data dependent:

    num_rows=1000
    while True:
        est_size=frames.filter(pl.col('i')<=num_rows).collect().estimated_size('mb')
        num_rows=round(num_rows*target_size/est_size)
        if abs(1-est_size/target_size)<.01: #change this to meet your needs on the tolerance
            break
    

    We're going to assume that every chunk of rows will be sufficiently close in size such that we are only going to do that once.

    We need to know how many rows there are in total so just do

    tot_rows=frames.select(pl.col('i').max()).collect()[0,0]
    

    Lastly, just loop through frames writing files

    row_range=(1, num_rows)
    while row_range[1] < tot_rows:
        (frames
            .filter((pl.col('i')>=row_range[0]) & (pl.col('i')<=row_range[1])).select(pl.exclude('i'))
            .collect()
            .write_parquet(repart_dir / f"{uuid.uuid4()}.parquet")
        )
        row_range = tuple(x+num_rows for x in row_range)
    

    You probably want to do some checks on all the new files before you delete the old ones.