I have a 2.3GB newline-delimited JSON file in an S3 bucket. My goal is to parse the data using a specified schema and write it out to a parquet file (this question considers a single file, my actual production use case involves multiple files).
My core approach is to define a dataset against the .jsonl
file with myds = pyarrow.dataset(...)
, then write it out as parquet with pyarrow.dataset.write_dataset(myds ...)
.
I have tested the following three implementations (imports and other extraneous details omitted to make the code less noisy):
myds= ds.dataset(
srcpath,
format='json',
schema=schema
)
ds.write_dataset(
myds,
destpath,
format='parquet',
... other write options
)
Specify the filesystem
argument for the dataset
and write_dataset
methods as a separately defined s3fs.S3FileSystem
object.
s3fsys = s3fs.S3FileSystem()
myds = ds.dataset(
srcpath,
format='json',
schema=schema,
filesystem=s3fsys
)
ds.write_dataset(
myds,
destpath,
format='parquet',
filesystem=s3fsys
... other write options
)
I am using the smart_open
library for the file transfer between S3 and local.
def transfer_file(source: str, destination: str, chunk_size: int = 10*2**20):
with (
smart_open.open(source, 'rb') as fin,
smart_open.open(destination, 'wb') as fout
):
while True:
chunk = fin.read(chunk_size)
if not chunk:
break
fout.write(chunk)
transfer_file(
s3src,
localsrc
)
myds = ds.dataset(
localsrc,
format='json',
schema=schema
)
ds.write_dataset(
myds,
localdest,
format='parquet',
... other write options
)
transfer_file(
localdestfile,
s3dest
)
I observed significant differences in the processing times for the three approaches (numbers below stayed pretty consistent over multiple runs):
smart_open
download/upload)Why is there such a big performance difference? Using the first approach keeps my code simpler but for my actual dataset (a couple of TB) the difference in time would be significant. I'm not sure if I'm just missing some options that I could provide to pyarrow.dataset
to make it more performant out of the box.
It looks like pyarrow.dataset.Dataset.scanner
(documentation) has various options to tweak how the dataset is read. I was able to get better performance with the approaches 1 Let PyArrow handle the filesystem and 2 Give PyArrow the filesystem by creating a scanner from the dataset with higher values for batch_size
and batch_readahead
:
myds= ds.dataset(
srcpath,
format='json',
schema=schema
)
scanner = myds.scanner(
batch_size=8*10**6,
batch_readahead=32
)
ds.write_dataset(
scanner,
destpath,
format='parquet',
... other write options
)
Tweaking those parameters allows me to make the tradeoff between memory consumption and speed.
Comparing between the approaches 1 and 2, for the same parameters for the scanner
, approach 2 (providing PyArrow with the filesystem) still seems to outperform approach 1. I'm not sure why, but providing the filesystem doesn't overly complicate my code and the performance is good enough so I'm not going to dig further.