There is a problem with my code that I can not solve for a while now.
I'm trying to convert a tar.gz compressed csv file to parquet. The file itself, when uncompressed, is about 700MB large. The processing is done in a memory-restricted system, so I have to process the file in batches.
I figured out how to read the tar.gz as a stream, extract the file I need and use pyarrow's open_csv()
to read batches. From here, I want to save the data to a parquet file by writing in batches.
This is where the problem appears. The file itself has lots of columns that don't have any values. But once in a while, there is a single value that appears in line 500.000 or something, so pyarrow does not recognize the dtype properly. Most of the columns are therefore of dtype null
. My idea is to modify the schema and cast these columns to string
, so any values are valid. Modifying the schema works fine, but when I run the code, I get this error.
Traceback (most recent call last):
File "b:\snippets\tar_converter.py", line 38, in <module>
batch = reader.read_next_batch()
File "pyarrow\ipc.pxi", line 682, in pyarrow.lib.RecordBatchReader.read_next_batch
File "pyarrow\error.pxi", line 100, in pyarrow.lib.check_status
pyarrow.lib.ArrowInvalid: In CSV column #49: CSV conversion error to null: invalid value '0.0000'
Line 38 is this one:
batch = reader.read_next_batch()
Does anyone have any idea how to enforce the schema to the batches so Here is my code.
import io
import os
import tarfile
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as csv
import logging
srcs = list()
path = "C:\\data"
for root, dirs, files in os.walk(path):
for name in files:
if name.endswith("tar.gz"):
srcs.append(os.path.join(root, name))
for source_file_name in srcs:
file_name: str = source_file_name.replace(".tar.gz", "")
target_file_name: str = source_file_name.replace(".tar.gz", ".parquet")
clean_file_name: str = os.path.basename(source_file_name.replace(".tar.gz", ""))
# download CSV file, preserving folder structure
logging.info(f"Processing '{source_file_name}'.")
with io.open(source_file_name, "rb") as file_obj_in:
# unpack all files to temp_path
file_obj_in.seek(0)
with tarfile.open(fileobj=file_obj_in, mode="r") as tf:
file_obj = tf.extractfile(f"{clean_file_name}.csv")
file_obj.seek(0)
reader = csv.open_csv(file_obj, read_options=csv.ReadOptions(block_size=25*1024*1024))
schema = reader.schema
null_cols = list()
for index, entry in enumerate(schema.types):
if entry.equals(pa.null()):
schema = schema.set(index, schema.field(index).with_type(pa.string()))
null_cols.append(index)
with pq.ParquetWriter(target_file_name, schema) as writer:
while True:
try:
batch = reader.read_next_batch()
table = pa.Table.from_batches(batches=[batch]).cast(target_schema=schema)
batch = table.to_batches()[0]
writer.write_batch(batch)
except StopIteration:
break
Also, I could leave out this part:
batch = reader.read_next_batch()
table = pa.Table.from_batches(batches=[batch]).cast(target_schema=schema)
batch = table.to_batches()[0]
But then the error is like this (shortened), showing that the schema change works at least.
Traceback (most recent call last):
File "b:\snippets\tar_converter.py", line 39, in <module>
writer.write_batch(batch)
File "C:\Users\me\AppData\Roaming\Python\Python39\site-packages\pyarrow\parquet\__init__.py", line 981, in write_batch
self.write_table(table, row_group_size)
File "C:\Users\me\AppData\Roaming\Python\Python39\site-packages\pyarrow\parquet\__init__.py", line 1004, in write_table
raise ValueError(msg)
ValueError: Table schema does not match schema used to create file:
table:
ACCOUNT_NAME: string
BOOK_VALUE: double
ESTIMATED_TO_REALISE: double
VAT_PAYABLE_ID: null
VAT_RECEIVABLE_ID: null
MONTHLY_AMOUNT_EFFECTIVE_DATE: null vs.
file:
ACCOUNT_NAME: string
BOOK_VALUE: double
ESTIMATED_TO_REALISE: double
VAT_PAYABLE_ID: string
VAT_RECEIVABLE_ID: string
MONTHLY_AMOUNT_EFFECTIVE_DATE: string
Thank you!
So I think I figured it out. Wanted to post it for those who have similar issues. Also, thanks to all who had a look and helped!
I did a workaround to solve this, by reading the file two times. In the first run I only read the first batch into stream to get the schema. Then, converted null columns to string and closed the stream (this is important if you use same variable name). After this you read the file again, but now passing the modified schema as a ReadOption to the reader. Thanks to @0x26res whose comment gave me the idea.
# get initial schema by reading one batch
initial_reader = csv.open_csv(file_obj, read_options=csv.ReadOptions(block_size=16*1024*1024))
schema = initial_reader.schema
for index, entry in enumerate(schema.types):
if entry.equals(pa.null()):
schema = schema.set(index, schema.field(index).with_type(pa.string()))
# now use the modified schema for reader
# must close old reader first, otherwise wrong data is loaded
file_obj.close()
file_obj = tf.extractfile(f"{file_name}.csv")
file_obj.seek(0)
reader = csv.open_csv(file_obj,
read_options=csv.ReadOptions(block_size=16*1024*1024),
convert_options=csv.ConvertOptions(column_types=schema))