Search code examples
parquetpython-polars

Add date column on per-file basis with Polars when aggregating over multiple Parquet files


I have a very large number of Parquet data files that I can nicely join and aggregate with Polars doing something like this (note the glob in filename):

(
    pl.scan_parquet('data/data-16828*.parquet')
    .groupby(['type_id', 'location_id'])
    .agg([
        pl.min('n').alias('n_min'),
        pl.max('n').alias('n_max')
    ])
    .collect()
)

Each file is an output of a script run every five minutes and my goal is to make a single timeseries DataFrame out of them. There is a date column of type datetime[μs, UTC]. However, I discovered that the values of this column are not equal in a single file, rather they reflect the exact time during the run when a row was created. Because of this, the date column, as it is, is useless for grouping.

The way I see this, I probably should add a new column and populate it with the date value of the first row on a per-file basis. Is it possible to achieve this with Polars' lazy API or am I going to have to first fix the files before running aggregation with Polars?

Please note that I need to use the lazy API as the dataset is way larger than memory.


Solution

  • The lazyframe doesn't have any information about the file from whence it came. For that reason you'll need to move the iteration out of polars so that you can feed the file info to the lazyframe yourself.

    Something like this:

    lazydf=[]
    from pathlib import Path
    basepath=Path('data/')
    for myfile in basepath.iterdir():
        if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue 
        lazydf.append((
            pl.scan_parquet(myfile)
            .groupby(['type_id', 'location_id'])
            .agg([
                pl.min('n').alias('n_min'),
                pl.max('n').alias('n_max')
            ])
            .with_columns(source_file=pl.lit(myfile.name))
        ))
    pl.concat(lazydf)
    

    This doesn't capture the first row aspect, for that you'd need to change out of the groupby/agg model and use a window function so that each column gets its own grouping like this:

    lazydf=[]
    from pathlib import Path
    basepath=Path('data/')
    for myfile in basepath.iterdir():
        if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue 
        lazydf.append((
            pl.scan_parquet(myfile)
            .select('type_id',
                    'location_id',
                    n_min=pl.col('n').min().over(['type_id','location_id']),
                    n_max=pl.col('n').max().over(['type_id','location_id']),
                    date=pl.col('date').first())
            .unique(subset=['type_id','location_id','n_min','n_max','date'])
        ))
    pl.concat(lazydf)