Search code examples
python-3.xpython-polarspyarrowparquet-dataset

How to read data from parquet partitioned dataset to polars


I use a large dataset already written to disk as a parquet partitioned dataset.

How can I read such data directly into Polars to get some aggregate calculation results? I want to avoid converting parquet data to pandas (pq_df.to_pandas()) as my data is larger than my computer memory.

Here is a reproducible example code. I appreciate your input.

import polars as pl    # Version 0.20.3
import pyarrow as pa   # Version 11.0.0
import pyarrow.parquet as pq
 
 
pl_df = pl.DataFrame({
                          "Name": ["ABC","DEF","GHI",'JKL'],
                          "date": ["2024-01-01","2024-01-10","2023-01-29","2023-01-29"],
                          "price":[1000,1500,1800,2100] ,
                          })
 
pl_df = pl_df.with_columns(date= pl.col("date").cast(pl.Date))
 
# write Polars data frame to disk as parquet dataset    
pq.write_to_dataset( pl_df.to_arrow(), root_path=r"C:\Users\desktop PC\Downloads\test_pl", partition_cols=["date"],
                        compression ='gzip',existing_data_behavior='overwrite_or_ignore')
                        
# Have a schema object of data written to parquet dataset
pd_df_schema = pa.Schema.from_pandas(pl_df.to_pandas())
 
# Read data written to parquet dataset
pq_df = pq.read_table(r"C:\Users\desktop PC\Downloads\test_pl",
                      schema=pd_df_schema,
                      )
 
# I want to use this parquest object to create a aggregate result via Polars with out using #"pq_df.to_pandas()" method.
 
df = (pl.from_pandas(pq_df.to_pandas()).lazy()
      .group_by(["date"])
      .agg(
          [
              pl.col("price").sum().alias("grouped_sum"),
              pl.col("price").count().alias("grouped_count"),])
      ).collect(streaming=True)

Solution

  • you can use from_arrow() method:

    (
        pl.from_arrow(pq_df).lazy()
        .group_by("date")
        .agg(
            pl.col("price").sum().alias("grouped_sum"),
            pl.col("price").count().alias("grouped_count")
        ).collect(streaming=True)
    )
    
    ┌──────┬─────────────────────┬───────┐
    │ Name ┆ date                ┆ price │
    │ ---  ┆ ---                 ┆ ---   │
    │ str  ┆ datetime[ms]        ┆ i64   │
    ╞══════╪═════════════════════╪═══════╡
    │ GHI  ┆ 2023-01-29 00:00:00 ┆ 1800  │
    │ JKL  ┆ 2023-01-29 00:00:00 ┆ 2100  │
    │ ABC  ┆ 2024-01-01 00:00:00 ┆ 1000  │
    │ DEF  ┆ 2024-01-10 00:00:00 ┆ 1500  │
    └──────┴─────────────────────┴───────┘
    

    but probably proper way to do that would be to use scan_parquet() functionality which allows you to scan the path:

    (
        pl.scan_parquet(r"test_pl/*/*.parquet")
        .group_by("date")
        .agg(
            pl.col("price").sum().alias("grouped_sum"),
            pl.col("price").count().alias("grouped_count")
        ).collect(streaming=True)
    )
    
    ┌────────────┬─────────────┬───────────────┐
    │ date       ┆ grouped_sum ┆ grouped_count │
    │ ---        ┆ ---         ┆ ---           │
    │ str        ┆ i64         ┆ u32           │
    ╞════════════╪═════════════╪═══════════════╡
    │ 2024-01-01 ┆ 1000        ┆ 1             │
    │ 2024-01-10 ┆ 1500        ┆ 1             │
    │ 2023-01-29 ┆ 3900        ┆ 2             │
    └────────────┴─────────────┴───────────────┘
    

    additionally, you can always use duckdb for that:

    duckdb.sql("""
        select
            a.date,
            sum(a.price) as grouped_sum,
            sum(a.price) as grouped_count
        from read_parquet('test_pl/*/*.parquet') as a
        group by
            a.date
    """).pl()
    
    ┌────────────┬─────────────┬───────────────┐
    │ date       ┆ grouped_sum ┆ grouped_count │
    │ ---        ┆ ---         ┆ ---           │
    │ date       ┆ f64         ┆ i64           │
    ╞════════════╪═════════════╪═══════════════╡
    │ 2024-01-01 ┆ 1000.0      ┆ 1             │
    │ 2024-01-10 ┆ 1500.0      ┆ 1             │
    │ 2023-01-29 ┆ 3900.0      ┆ 2             │
    └────────────┴─────────────┴───────────────┘