I use a large dataset already written to disk as a parquet partitioned dataset.
How can I read such data directly into Polars to get some aggregate calculation results? I want to avoid converting parquet data to pandas (pq_df.to_pandas()) as my data is larger than my computer memory.
Here is a reproducible example code. I appreciate your input.
import polars as pl # Version 0.20.3
import pyarrow as pa # Version 11.0.0
import pyarrow.parquet as pq
pl_df = pl.DataFrame({
"Name": ["ABC","DEF","GHI",'JKL'],
"date": ["2024-01-01","2024-01-10","2023-01-29","2023-01-29"],
"price":[1000,1500,1800,2100] ,
})
pl_df = pl_df.with_columns(date= pl.col("date").cast(pl.Date))
# write Polars data frame to disk as parquet dataset
pq.write_to_dataset( pl_df.to_arrow(), root_path=r"C:\Users\desktop PC\Downloads\test_pl", partition_cols=["date"],
compression ='gzip',existing_data_behavior='overwrite_or_ignore')
# Have a schema object of data written to parquet dataset
pd_df_schema = pa.Schema.from_pandas(pl_df.to_pandas())
# Read data written to parquet dataset
pq_df = pq.read_table(r"C:\Users\desktop PC\Downloads\test_pl",
schema=pd_df_schema,
)
# I want to use this parquest object to create a aggregate result via Polars with out using #"pq_df.to_pandas()" method.
df = (pl.from_pandas(pq_df.to_pandas()).lazy()
.group_by(["date"])
.agg(
[
pl.col("price").sum().alias("grouped_sum"),
pl.col("price").count().alias("grouped_count"),])
).collect(streaming=True)
you can use from_arrow()
method:
(
pl.from_arrow(pq_df).lazy()
.group_by("date")
.agg(
pl.col("price").sum().alias("grouped_sum"),
pl.col("price").count().alias("grouped_count")
).collect(streaming=True)
)
┌──────┬─────────────────────┬───────┐
│ Name ┆ date ┆ price │
│ --- ┆ --- ┆ --- │
│ str ┆ datetime[ms] ┆ i64 │
╞══════╪═════════════════════╪═══════╡
│ GHI ┆ 2023-01-29 00:00:00 ┆ 1800 │
│ JKL ┆ 2023-01-29 00:00:00 ┆ 2100 │
│ ABC ┆ 2024-01-01 00:00:00 ┆ 1000 │
│ DEF ┆ 2024-01-10 00:00:00 ┆ 1500 │
└──────┴─────────────────────┴───────┘
but probably proper way to do that would be to use scan_parquet()
functionality which allows you to scan the path:
(
pl.scan_parquet(r"test_pl/*/*.parquet")
.group_by("date")
.agg(
pl.col("price").sum().alias("grouped_sum"),
pl.col("price").count().alias("grouped_count")
).collect(streaming=True)
)
┌────────────┬─────────────┬───────────────┐
│ date ┆ grouped_sum ┆ grouped_count │
│ --- ┆ --- ┆ --- │
│ str ┆ i64 ┆ u32 │
╞════════════╪═════════════╪═══════════════╡
│ 2024-01-01 ┆ 1000 ┆ 1 │
│ 2024-01-10 ┆ 1500 ┆ 1 │
│ 2023-01-29 ┆ 3900 ┆ 2 │
└────────────┴─────────────┴───────────────┘
additionally, you can always use duckdb
for that:
duckdb.sql("""
select
a.date,
sum(a.price) as grouped_sum,
sum(a.price) as grouped_count
from read_parquet('test_pl/*/*.parquet') as a
group by
a.date
""").pl()
┌────────────┬─────────────┬───────────────┐
│ date ┆ grouped_sum ┆ grouped_count │
│ --- ┆ --- ┆ --- │
│ date ┆ f64 ┆ i64 │
╞════════════╪═════════════╪═══════════════╡
│ 2024-01-01 ┆ 1000.0 ┆ 1 │
│ 2024-01-10 ┆ 1500.0 ┆ 1 │
│ 2023-01-29 ┆ 3900.0 ┆ 2 │
└────────────┴─────────────┴───────────────┘