Search code examples
python-polars

Polars scan_ndjson does not work with streaming?


I am attempting to read data from a large (300GB) newline-delimited JSON file, extract specific fields of interest and write them to a parquet file. This question is a follow-up on my previous SO question which has more background and the structure of the data I'm working with.

Each line/JSON object is independent of the others, so I would have imagined this could be handled in a streaming fashion, processing the file (which is too large to fit in memory) in chunks.

The code that does the actual scan, collect and write is very simple:

# define the schema...

pl.scan_ndjson(
        'data/input/myjson.jsonl',
        schema=prschema)\
    .collect(streaming=True)\
    .write_parquet('data/output/myparquet.parquet',
        compression='snappy',
        use_pyarrow=True
)

However, as I work with increasingly larger subsets of my final file, I see that memory consumption increases linearly with input file size.

If I check the explain plan using explain(streaming=True) I see that streaming is NOT being used:

Anonymous SCAN 
  PROJECT */6 COLUMNS

So my question is, why does streaming not appear to work for this seemingly straightforward read/write use case?


UPDATED

Using sink_parquet instead of write_parquet does not work (in fact it's what I had originally tried). To be sure that it wasn't due to the complex nature of my JSON files, I even tried a very simplified version that just attempts to write two scalar (no nested objects) fields:

pl.scan_ndjson('data/input/myjson.jsonl')\
    .select('id', 'standing')\
    .sink_parquet(
        'data/output/myparquet.parquet',
        compression='snappy'
    )

This throws InvalidOperationError: sink_Parquet(ParquetWriteOptions { compression: Snappy, statistics: false, row_group_size: None, data_pagesize_limit: None, maintain_order: true }) not yet supported in standard engine. Use 'collect().write_parquet()'


Solution

  • Streaming only means that the input is pulled in batches. This is useful if you're doing something like a filter or aggregation, where it's possible to compute results without having all of the data in memory ahead of time. But the result of collecting is still a DataFrame that resides entirely in memory.

    I believe what you want is sink_parquet, which is the lazy version of write_parquet.

    pl.scan_ndjson(...).sink_parquet(...)