Search code examples
pythonparquetpython-polars

Error using sink_parquet in Polars library


I’m trying to extract some features from a dataset and then write the results to a Parquet file using the Polars library in Python. Here’s the code I’m using:

import ipaddress

import numpy as np
import polars as pl


def extract_session_features(sessions: pl.LazyFrame) -> pl.LazyFrame:
    return (
        sessions.with_columns(
            (pl.col("dpkts") + pl.col("spkts")).alias("total_packets"),
            (pl.col("dbytes") + pl.col("sbytes")).alias("total_bytes"),
            (pl.col("dpkts") / pl.col("spkts")).alias("bytes_ratio"),
            (pl.col("dbytes") / pl.col("sbytes")).alias("packets_ratio"),
            (pl.col("spkts") / pl.col("dur")).alias("sent_packets_rate"),
            (pl.col("dpkts") / pl.col("dur")).alias("received_packets_rate"),
            (pl.col("sbytes") / pl.col("dur")).alias("sent_bytes_rate"),
            (pl.col("dbytes") / pl.col("dur")).alias("received_bytes_rate"),
            (pl.col("sbytes") / pl.col("spkts")).alias("mean_pkt_sent_size"),
            (pl.col("dbytes") / pl.col("dpkts")).alias("mean_pkt_recv_size"),
            (
                pl.col("Timestamp")
                .diff()
                .dt.total_seconds()
                .fill_null(0)
                .over("ID")
                .alias("time_since_last_session")
            ),
        )
        .with_columns(
            pl.when(pl.col("^.*(_ratio|_rate).*$").is_infinite())
            .then(-1)
            .otherwise(pl.col("^.*(_ratio|_rate).*$"))
            .name.keep()
        )
        .fill_nan(-1)
    )


filtered_sessions = pl.scan_parquet("./processed_merge_file_filtered.parquet")
print(filtered_sessions.head().collect(streaming=True))


sessions_features = extract_session_features(filtered_sessions)
sessions_features.sink_parquet("./sessions_features")

When I run this code, I get the following error:

thread '<unnamed>' panicked at /home/runner/work/polars/polars/crates/polars-lazy/src/physical_plan/planner/lp.rs:153:28:
sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Traceback (most recent call last):
  File "/home/cpinon/Documentos/Project/SourceCode/project/data/01_raw/sessions/test_feature_engineering.py", line 50, in <module>
    sessions_features.sink_parquet("./sessions_features")
  File "/home/cpinon/Documentos/Project/SourceCode/project/.venv/lib/python3.9/site-packages/polars/lazyframe/frame.py", line 1895, in sink_parquet
    return lf.sink_parquet(
pyo3_runtime.PanicException: sink_parquet not yet supported in standard engine. Use 'collect().write_parquet()'

The error message suggests using collect().write_parquet(), but I’m not sure why I can't use sink_parquet() method. If I use the former solution, my computer runs out of memory, since it cannot process the entire dataframe.


Solution

  • Not all polars operations are supported by the streaming engine. In particular, I would guess that this is what's keeping you from streaming:

    (
        pl.col("Timestamp")
        .diff()
        .dt.total_seconds()
        .fill_null(0)
        .over("ID")
        .alias("time_since_last_session")
    ),
    
    

    you can verify that by doing

    print(sessions_features.explain(streaming=True))
    

    you'll then get something that looks a little bit like:

    print(pl.select(a=pl.lit(1)).lazy().select(pl.col('a').pow(2)).explain(streaming=True))
    --- STREAMING
     SELECT [col("a").pow([2])] FROM
      DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: "None"  --- END STREAMING
    
      DF []; PROJECT */0 COLUMNS; SELECTION: "None"
    

    You want to see everything in between the --- STREAMING and the --- END STREAMING. Any operations that are outside those lines aren't supported by streaming.

    The official documentation on streaming is a bit lax but here's some extra info.

    As far as getting what you need, you could do something loosely like in Quasi sinking workaround where you have to get a unique list of your ID and then loop through a filtered version.

    Another idea would be to just use the pyarrow dataset writer or duckdb as they may (I don't have specific knowledge on whether or not they do) be able to support your streaming need.