Search code examples
pythonpandasparquetpyarrow

PyArrow dataset missing new data


I have a python process that writes data into parquet files, using Pyarrow=6.0.0. I init a PyArrow Dataset with:

import pyarrow.dataset as ds

root_directory = "bucket_name_in_gcp"
fs = gcsfs.GCSFileSystem(project=project)
pa_fs = PyFileSystem(FSSpecHandler(fs))
self.partitions = ds.partitioning(pyarrow.schema([("item_id", pyarrow.string()), ("group", pyarrow.string())]), flavor="hive")

dataset = ds.dataset(source=root_directory, filesystem=fs, partitioning=partitions, format="parquet")

later in the code I use

item_id_condition = ds.field("item_id") == "xxx"
group_condition = ds.field("group") == "group_1"
filters = item_id_condition & group_condition

results = dataset.to_table(filter=filters).to_pandas()

to read data from the storage and I get an empty dataframe and that is fine.

later I use:

file_path = f'{root_directory}/item_id=xxx/group=group_1'

with pyarrow.BufferOutputStream() as output_buffer:
    parquet.write_table(table, output_buffer)
    parquet_bytes = output_buffer.getvalue().to_pybytes()
    with pa_fs.open_output_stream(str(file_path)) as stream:
        stream.write(parquet_bytes)

to write a table of data to the storage, at that point I can view the file and it's content.

But if I try to use the read function (dataset.to_table) again I still get an empty dataframe. Why PyArrow dataset doesn't recognize the new files? If I will re-create the ds.dataset object it will recognize all the existing data

Am I missing anything? is there a way to refresh the dataset? or I need to init it each time?


Solution

  • I think you would need to run

    dataset = ds.dataset(source=root_directory, filesystem=fs, partitioning=partitions, format="parquet")
    

    again and then it should recognize the new files. If I understand correctly the connection to the gcsfs filesystem has to be "refreshed" in this way (re-creating the ds.dataset as you have said)