Search code examples
pythondaskparquetfastparquet

Efficiently reading only some columns from parquet file on blob storage using dask


How can I efficiently read only some of the columns of a parquet file that is hosted in a cloud blob storage (e.g. S3 / Azure Blob Storage)?

The columnar structure is one of the parquet file format's key advantages so that reading columns selectively can reduce I/O load. It's also natural to store data in blob storages for running large-scale workloads on the cloud. However, once a parquet file is stored as a blob, most libraries (dask, fastparquet, pyarrow) can't really take advantage of this since the underlying fseek is not really possible directly on the blobs - meaning that regardless of which columns are selected one would have to download the entire file to a local file system before reading it.

What is therefore the best practice if my use-case is such that separate applications require different columns, and the performance cost of downloading entire files for just a few columns is unacceptable? Should I be storing different parquet files for each columns along with a common index and then merging at the application level using pandas/dask etc? Does the apache parquet format have some built-in support for splitting a dataset by columns - similar to how the hive format splits by division and then by partition?

Any help / concrete example using dask or fastparquet is appreciated.


Solution

  • (author of fsspec and fastparquet writing)

    Short answer: yes, Dask reading parquet will select only the columns you need from the remote store, and in certain cases can read only the subsection of partitions from the whole dataset. You are best of defining the set of columns= in the call to read_parquet, if you know beforehand, but Dask will attempt to infer the right values from your graph of computations; e.g., dd.read_parquet(...).column1.compute() would only fetch "column1". Such inference can fail for more complex computations.

    The backend stores, even if key-value like (azure blob and datalake, s3, gcsfs), still do support range requests, which means that only the bytes of interest get downloaded by the workers.

    There are some subtleties, however. Remote stores will have much higher latency (time-to-first-byte) compared to disks, so the data throughput depends heavily on the number of requests male: seeking around a file will have variable efficiency, depending on the read-ahead/caching strategy employed. You can use the storage_options argument to fine-tune this behaviour.

    No, there is no particular support for storing columns separately, although joining on identical indexes should usually be efficient. However, it will not usually be necessary, and more important are other considerations, such as the right data type and partition size to use. Such things are commonly case dependant, and the latency of your particular data store may be an important factor.

    Please also see https://github.com/fsspec/filesystem_spec/issues/885 for an fsspec caching scheme specifically tailored for parquet data.