I have some streaming coming in as JSON that I transform into a polars dataframe and then I write out the data as parquet partitioned by two columns. I am noticing that if a new record has the same partition then instead of writing an additional file in the folder it is just overwriting the old data with the new data. I want to keep the old data and write new data to the partition folder.
Here is a reproducible example:
# Create a record
df_a = pl.DataFrame(
{
'type': ['a'],
'date': ['2024-08-15'],
'value': [68]
}
)
# Create a record
df_b = pl.DataFrame(
{
'type': ['a'],
'date': ['2024-08-15'],
'value': [39]
}
)
# Write the first record partitioned by type and date
df_a.write_parquet('./data/example.parquet', partition_by=['type', 'date'])
# Write the second record partitioned by type and date where the type and date are the same as the first record
df_b.write_parquet('./data/example.parquet', partition_by=['type', 'date'])
# Read the written data
check = pl.read_parquet('./data/example.parquet/')
You'll see that only df_b
data is retained in the read data. I want both data to be retained. I also do not want to read the previous data and append the new data to it with pl.concat()
before writing it out again.
Here's the layout of a parquet file.
The data is broken up, first into row groups and, within those row groups, your columns. At the end of the file is a footer with metadata about the data. The metadata has the information about where the various row groups/columns start and end. That data has to be at the end of the file because the writer doesn't know that information until it writes it and the reader couldn't find it unless it was either at the beginning or the end. (In other words it couldn't find it if it were in an arbitrary middle point). Once the file is written, there's no way to put more data in, the only thing you can do is to rewrite the file. This is in contrast to a csv or even ndjson file which is just text so that there's nothing meaningful at the end of the file so there's no reason it can't be appended to.
Instead of appending to existing files you can just add more files to a directory like this:
from pathlib import Path
Path('./data/example').mkdir(parents=True, exist_ok=True)
df_a.write_parquet(f'./data/example/{uuid4()}.parquet')
df_b.write_parquet(f'./data/example/{uuid4()}.parquet')
# Read the written data
check = pl.read_parquet(f'./data/example/*.parquet/')
If you wind up with a lot of files, one thing you might find is that you don't want to have to read, even the metadata, of all of them when you just need a subset of the data. Hive partitions help with that. Suppose you had a much bigger version of:
df_a = pl.DataFrame(
{
'type': ['a','b'],
'date': ['2024-08-15','2024-08-16'],
'value': [68,70]
}
)
then you could save it as a hive partition with
df_a.write_parquet('./data/example/', partition_by='type')
and it'll make a folder named type=a and type=b with the relevant part of the data in each of them. Then, when you later want to query on just type a you'd do
pl.scan_parquet("./data/example/**/*.parquet").filter(pl.col('type')=='a').collect()
and then polars will only look at the files in the type=a folder.
The polars writer with partition_by
will silently overwrite existing files so don't try to add files to an existing folder. (as of version 1.5.0) I just put in a bug report on the silent overwriting.
In addition to hive partitions that I described here, there is also deltalake which takes this approach to another level. The writer not only writes parquet files but it also writes log files that contain file stats. That allows the reader to only have to look at the log rather than looking at the metadata of a lot of files. There's also apache iceberg which I assume is similar but I've never looked into it.