I have begun to learn and implement Polars because of (1) the potential speed improvements and (2) for the promise of being able to process larger-than-memory datasets. However, I'm struggling to see how the second promise is actually delivered in specific scenarios that my use case requires.
One specific example I'm struggling with is how to read a multi-GB JSONL file from S3, apply a few transformations, and send the modified records to STDOUT
.
As I just raised in GitHub, the sink_*()
methods do not support writing to a buffer or file-like - only to a named file path. Otherwise, it seems the simple solution would be something like sink_ndjson(sys.stdout, ...)
DataFrame
or LazyFrame
into smaller data frames.The next thing I tried was to get smaller batches or dataframes (for instance 100K rows at a time) which I could process in memory and write with write_ndjson(sys.stdout, ...)
one at a time until I reach the end of the stream.
The closest I could find is LazyFrame.slice(offset, batch_size).collect()
- except in practice, this seems to hang/crash on the first invocation rather than reading just the first n records and then proceeding. Even when I set a low number of records in the LazyFrame's schema scan limit. Perhaps this is a bug - but even still, the slice()
method does not seem specifically designed for getting incremental batches from the lazy frame.
Any help is much appreciated!
As was mentioned in comments, the streaming engine is undergoing a significant revamp to address the shortcomings of the current implementation. The details of that revamp, as far as I'm aware, haven't been documented so I can't say that this exact use case will be addressed in that revamp. It's not clear, to me, what the benefit is of saving data to stdout (in memory) if your overall data is too big for memory so that enhancement request may be too niche for it to be picked up in general. It seems a more in-mission ask would be for a read_ndjson_batched
function similar to the read_csv_batched
function.
In the interim, you can implement this in python using s3's fsspec handler to read the file in lines and form your own batches.
import polars as pl
import fsspec # or the s3 fsspec implementation lib
from io import BytesIO
s3fs = fsspec.filesystem() # or aforementioned s3 class
batch_size = 100_000
with s3fs.open(path, 'rb') as ff:
while True:
batch = []
dbl_break=False
for _ in range(batch_size):
line = ff.readline()
if not line:
dbl_break=True
break
batch.append(line)
if len(batch)==0:
break
df_batch = pl.read_ndjson(b"\n".join(batch))
do_batch_process(df_batch)
if dbl_break:
break