I have a partitioned parquet dataset directory on my local machine which looks something like this:
dataset
|
├───_common_metadata
├───_metadata
├───travel=land
| |
| ├───direction=arrivals
| | |
| | ├───dayofweek=Monday
| | ├───dayofweek=Tuesday
| | ├───dayofweek=Wednesday
| | ├───...
| | └───dayofweek=Sunday
| |
| └───direction=departures
| |
| ├───dayofweek=Monday
| ├───...
| └───dayofweek=Sunday
|
├───travel=sea
| |
| ├───direction=arrivals
| | |
| | ├───dayofweek=Monday
| | ...
| |
| └───direction=departures
| |
| ...
|
└───travel=air
|
...
So navigating to a particular parquet file would be:
/dataset/travel=air/direction=departures/dayofweek=Sunday/data.part1.parquet
/dataset/travel=air/direction=departures/dayofweek=Sunday/data.part2.parquet
..and so on.
The data is tabular with headers like "registration", "delayed", "passenger_count", etc.
I can load this data into a pandas dataframe with pyarrow.parquet.ParquetDataset and filter it like this, and it works fine:
import pyarrow.parquet as pq
import pandas as pd
filters = [
("travel", "in", ["land", "air"]),
("direction", "in", ["departures"]),
("dayofweek", "in", ["Monday", "Wednesday"]),
("delayed", "=", "T"),
("passenger_count", ">=", 100)
]
pds = pq.ParquetDataset("/path/to/dataset", filters=filters, use_legacy_dataset=False)
df = pds.read_pandas().to_pandas()
But, the data I have is very large and does not fit in memory so I am trying to use Dask instead of Pandas. I have the following converted code for doing the same thing as above:
import pyarrow.parquet as pq
import dask.dataframe as dd
filters = [
("travel", "in", ["land", "air"]),
("direction", "in", ["departures"]),
("dayofweek", "in", ["Monday", "Wednesday"]),
("delayed", "=", "T"),
("passenger_count", ">=", 100)
]
df = dd.read_parquet(
"/path/to/dataset",
filters=filters,
engine="pyarrow",
use_legacy_dataset=False
)
Running df.info()
and df.head(5)
after loading with dask returns the correct column names and dtypes but an empty dataframe:
Columns: 127 entries, sequence to expired
Empty Dataframe <-- BAD!
dtypes: object(74), float32(9), int64(44)None
Empty DataFrame
Columns: [registration, delayed, passenger_count ...]
Index: [] <-- BAD!
I'm trying to understand what I'm doing here. Are the filters formatted incorrectly for dd.read_parquet()
? Are the input arguments I should provide to dd.read_parquet()
which I am missing? Your help is much appreciated.
It is hard to say for sure, but it is possible that nothing is wrong at all. Since dask produces lazy objects until you explicitly reduce or compute, it only holds the minimum of metadata. In particular, when filtering, there may be partitions with no data inside. df.head()
only fetches data from the first partition by default, so you might want to perform an operation guaranteed to read some of the data:
len(df) # explicitly scan dataframe and count valid rows
If you still get a value of 0 out, you may want to try with the fastparquet backend; it only performs partition-filtering rather than row-filtering by default.