Search code examples
python-3.xbatch-processingparquetpyarrow

Non-consistent schema in apache arrow


Main question: When processing data batch per batch, how to handle changing schema in pyarrow ?

Long story:

As an example, I have the following data

| col_a | col_b |
-----------------
|  10   |  42   |
|  41   |  21   |
| 'foo' |  11   |
| 'bar' |  99   |

I'm working with python 3.7 and using pandas 1.1.0.

>>> import pandas as pd
>>> df = pd.read_csv('test.csv')
>>> df
  col_a  col_b
0    10     42
1    41     21
2   foo     11
3   bar     99
>>> df.dtypes
col_a    object
col_b     int64
dtype: object  
>>>

I need to start working with Apache Arrow using PyArrow 1.0.1 implementation. In my application, we are working batch per batch. This means that we see part of the data, thus part of data types.

>>> dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
>>> dfi
<pandas.io.parsers.TextFileReader object at 0x7fabae915c50>
>>> dfg = next(dfi)
>>> dfg
   col_a  col_b
0     10     42
1     41     21
>>> sub_1 = next(dfi)
>>> sub_2 = next(dfi)
>>> sub_1
  col_a  col_b
2   foo     11
3   bar     99
>>> dfg2
  col_a  col_b
2   foo     11
3   bar     99
>>> sub_1.dtypes
col_a    int64
col_b    int64
dtype: object 
>>> sub_2.dtypes
col_a    object
col_b     int64
dtype: object  
>>>

My goal is to persist this whole dataframe using parquet format of Apache Arrow in the constraint of working batch per batch. It requires us to correctly fill the schema. How does one handle dtypes that change over batchs ? Here's the full code to reproduce the problem using above data.

from pyarrow import RecordBatch, RecordBatchFileWriter, RecordBatchFileReader
import pandas as pd

pd.DataFrame([['10', 42], ['41', 21], ['foo', 11], ['bar', 99]], columns=['col_a', 'col_b']).to_csv('test.csv')
dfi = pd.read_csv('test.csv', iterator=True, chunksize=2)
sub_1 = next(dfi)
sub_2 = next(dfi)

# No schema provided here. Pyarrow should infer the schema from data. The first column is identified as a col of int.
batch_to_write_1 = RecordBatch.from_pandas(sub_1)
schema = batch_to_write_1.schema
writer = RecordBatchFileWriter('test.parquet', schema)
writer.write(batch_to_write_1)

# We expect to keep the same schema but that is not true, the schema does not match sub_2 data. So the
# following line launch an exception.
batch_to_write_2 = RecordBatch.from_pandas(sub_2, schema)
# writer.write(batch_to_write_2)  # This will fail bcs batch_to_write_2 is not defined

We get the following exception

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "pyarrow/table.pxi", line 858, in pyarrow.lib.RecordBatch.from_pandas
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in dataframe_to_arrays
    for c, f in zip(columns_to_convert, convert_fields)]
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 579, in <listcomp>
    for c, f in zip(columns_to_convert, convert_fields)]
  File "/mnt/e/miniconda/envs/pandas/lib/python3.7/site-packages/pyarrow/pandas_compat.py", line 559, in convert_column     
    result = pa.array(col, type=type_, from_pandas=True, safe=safe)
  File "pyarrow/array.pxi", line 265, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 80, in pyarrow.lib._ndarray_to_array
TypeError: an integer is required (got type str)

Solution

  • This behavior is intended. Some alternatives to try (I believe they should work but I haven't tested all of them):

    1. If you know the final schema up front construct it by hand in pyarrow instead of relying on inferred one from the first record batch.
    2. Go through all the data and compute a final schema. Then reprocess the data with the new schema.
    3. Detect a schema change and recast previous record batches.
    4. Detect the schema change and start a new table (you will would then end up with one parquet file per schema and you would need another process to unify the schemas).

    Lastly if it works, and you are trying to tranform CSV data, you might consider using the built in Arrow CSV parser.