I want to access Parquet files on an Azure data lake, and only retrieve some rows.
Here is a reproducible example, using a public dataset:
import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem
abfs_public = AzureBlobFileSystem(
account_name="azureopendatastorage")
dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)
The processing time is the same for collecting 5 rows compared to collecting the full dataset. Is there a way to achieve slice pushdown using Pyarrow?
Here are my tests:
dataset_public.to_table()
# 5min 30s
dataset_public.head(5)
# 5min 11s
dataset_public.scanner().head(5)
# 5min 43s
I'm not sure if there is a difference between .head()
and .scanner().head()
Related pages:
With a few tweaks, I think I got what you were looking for. First, let's look at the original code you posted:
import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem
abfs_public = AzureBlobFileSystem(
account_name="azureopendatastorage")
dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)
Looking at the path you provided, you're pointing it a single file instead of the whole dataset. Adding a couple tweaks:
import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem
abfs_public = AzureBlobFileSystem(account_name="azureopendatastorage")
dataset_public = ds.dataset('nyctlc/yellow/', filesystem=abfs_public, partitioning='hive')
Now, using dataset_public.head(5)
I get:
Since I didn't give it a sort order, it just grabbed the first 5 rows it could get from whatever file happened to be the first fragment (most likely).
In your original code example, the path you gave was using puYear=2010
and puMonth=1
, so we can use those. Because we told it to use hive
partitioning, we can confirm that it picked up that the dataset is partitioned on these values:
print(dataset_public.partitioning.schema)
# prints:
# puYear: int32
# puMonth: int32
# -- schema metadata --
# org.apache.spark.sql.parquet.row.metadata: '{"type":"struct","fields":[{"' + 1456
And if we get the first 5 rows using those fields as a filter:
So that took 1min and 31s. But we can do better than that!
W00t! 1.12s
See, the default batch_size is pretty large, I forget what it is offhand right now. But if you only want to grab a small number of rows you can adjust the batch_size and fragment readahead size etc. To better fit your use case.
If you look at the base API documentation for the head()
method, it has **kwargs
that says "see scanner() method for full parameter description". And if you go to the scanner()
method, it points you here: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_dataset where you can see all of the available parameters to the method. Like getting only a subset of columns (very efficient because Parquet):
I hope this helps you understand better how to leverage the dataset APIs and the rough edges / tricks to improving performance.