Search code examples
pythonazureazure-data-lakepyarrowapache-arrow

Pyarrow slice pushdown for Azure data lake


I want to access Parquet files on an Azure data lake, and only retrieve some rows.

Here is a reproducible example, using a public dataset:

import pyarrow.dataset as ds
from adlfs import AzureBlobFileSystem

abfs_public = AzureBlobFileSystem(
    account_name="azureopendatastorage")

dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)

The processing time is the same for collecting 5 rows compared to collecting the full dataset. Is there a way to achieve slice pushdown using Pyarrow?

Here are my tests:

dataset_public.to_table()
# 5min 30s

dataset_public.head(5)
# 5min 11s

dataset_public.scanner().head(5)
# 5min 43s

I'm not sure if there is a difference between .head() and .scanner().head()

Related pages:


Solution

  • With a few tweaks, I think I got what you were looking for. First, let's look at the original code you posted:

    import pyarrow.dataset as ds
    from adlfs import AzureBlobFileSystem
    
    abfs_public = AzureBlobFileSystem(
        account_name="azureopendatastorage")
    
    dataset_public = ds.dataset('az://nyctlc/yellow/puYear=2010/puMonth=1/part-00000-tid-8898858832658823408-a1de80bd-eed3-4d11-b9d4-fa74bfbd47bc-426339-18.c000.snappy.parquet', filesystem=abfs_public)
    

    Looking at the path you provided, you're pointing it a single file instead of the whole dataset. Adding a couple tweaks:

    import pyarrow.dataset as ds
    from adlfs import AzureBlobFileSystem
    
    abfs_public = AzureBlobFileSystem(account_name="azureopendatastorage")
    dataset_public = ds.dataset('nyctlc/yellow/', filesystem=abfs_public, partitioning='hive')
    

    Now, using dataset_public.head(5) I get:

    Screenshot VSCode Jupyter

    Since I didn't give it a sort order, it just grabbed the first 5 rows it could get from whatever file happened to be the first fragment (most likely).

    In your original code example, the path you gave was using puYear=2010 and puMonth=1, so we can use those. Because we told it to use hive partitioning, we can confirm that it picked up that the dataset is partitioned on these values:

    print(dataset_public.partitioning.schema)
    # prints:
    # puYear: int32
    # puMonth: int32
    # -- schema metadata --
    # org.apache.spark.sql.parquet.row.metadata: '{"type":"struct","fields":[{"' + 1456
    

    And if we get the first 5 rows using those fields as a filter:

    get filtered rows

    So that took 1min and 31s. But we can do better than that!

    batch_size = 5

    W00t! 1.12s

    See, the default batch_size is pretty large, I forget what it is offhand right now. But if you only want to grab a small number of rows you can adjust the batch_size and fragment readahead size etc. To better fit your use case.

    If you look at the base API documentation for the head() method, it has **kwargs that says "see scanner() method for full parameter description". And if you go to the scanner() method, it points you here: https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_dataset where you can see all of the available parameters to the method. Like getting only a subset of columns (very efficient because Parquet):

    column filter

    I hope this helps you understand better how to leverage the dataset APIs and the rough edges / tricks to improving performance.