I'm running into an issue where I'm saving different parquet files with slightly different schema, but which have shared partition columns. I have created the following as a minimal reproducible example:
from dask import dataframe as dd
import pandas as pd
import shutil
def save_parquet():
df1 = pd.DataFrame({"A": [1], "B": [1]})
df2 = pd.DataFrame({"A": [2], "C": [2]})
df1.to_parquet("test.parquet", partition_cols=["A"])
df2.to_parquet("test.parquet", partition_cols=["A"])
def load_parquet():
filters = [[
("A", "==", 2)
]]
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
def main():
save_parquet()
load_parquet()
if __name__=="__main__":
main()
Running the above causes the following exception:
Traceback (most recent call last):
File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 133, in wrapper
return func(*args, **kwargs)
File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 578, in read_parquet
meta, index, columns = set_index_columns(meta, index, columns, auto_index_allowed)
File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 1487, in set_index_columns
raise ValueError(
ValueError: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mre.py", line 26, in <module>
main()
File "mre.py", line 23, in main
load_parquet()
File "mre.py", line 15, in load_parquet
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 135, in wrapper
raise type(e)(
ValueError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')
My expectation would be that the ("A", "==", 2)
filter should stop us from loading the schema from df1
, and regardless of whether it loads df1
or not, that it should be able to find the "C"
column from df2
. Am I missing something here?
Changing the columns
field to columns=["A", "B"]
successfully reads in the data, so it feels what I'm trying to do should be possible somehow.
This post indicates that read_parquet reads the schema from the first parquet file it encounters, but that you can specify a schema to avoid this.
Specifying a schema
like
import pyarrow as pa
...
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters, schema=pa.schema({"A": pa.int64(), "C": pa.int64()}))
still triggers the exception.
Specifying a schema
without specifying columns
doesn't trigger the exception, but returns a dask dataframe without the "C" column (seemingly regardless of what was in the schema):
>>> print(ddf.columns)
Index(['B', 'A'], dtype='object')
Is there a way to prevent read_parquet
from using the .parquet files that should be filtered out?
Found a somewhat silly workaround.
dask.dataframe.read_parquet can take a list of parquet files within partitions (rather than the top-level parquet folder). I just used path globbing to manually filter down to only the relevant parquet files I wanted to use (based on partition folder) and pass that list in directly.