I have a python process that writes data into parquet files, using Pyarrow=6.0.0
. I init a PyArrow Dataset with:
import pyarrow.dataset as ds
root_directory = "bucket_name_in_gcp"
fs = gcsfs.GCSFileSystem(project=project)
pa_fs = PyFileSystem(FSSpecHandler(fs))
self.partitions = ds.partitioning(pyarrow.schema([("item_id", pyarrow.string()), ("group", pyarrow.string())]), flavor="hive")
dataset = ds.dataset(source=root_directory, filesystem=fs, partitioning=partitions, format="parquet")
later in the code I use
item_id_condition = ds.field("item_id") == "xxx"
group_condition = ds.field("group") == "group_1"
filters = item_id_condition & group_condition
results = dataset.to_table(filter=filters).to_pandas()
to read data from the storage and I get an empty dataframe and that is fine.
later I use:
file_path = f'{root_directory}/item_id=xxx/group=group_1'
with pyarrow.BufferOutputStream() as output_buffer:
parquet.write_table(table, output_buffer)
parquet_bytes = output_buffer.getvalue().to_pybytes()
with pa_fs.open_output_stream(str(file_path)) as stream:
stream.write(parquet_bytes)
to write a table of data to the storage, at that point I can view the file and it's content.
But if I try to use the read function (dataset.to_table
) again I still get an empty dataframe. Why PyArrow dataset doesn't recognize the new files?
If I will re-create the ds.dataset
object it will recognize all the existing data
Am I missing anything? is there a way to refresh
the dataset? or I need to init it each time?
I think you would need to run
dataset = ds.dataset(source=root_directory, filesystem=fs, partitioning=partitions, format="parquet")
again and then it should recognize the new files. If I understand correctly the connection to the gcsfs filesystem
has to be "refreshed" in this way (re-creating the ds.dataset
as you have said)