Search code examples
pythoncsvdaskparquetpyarrow

Dask dataframe -containing columns that are numpy arrays- conversion to parquet error


I have a dask dataframe which I am trying to convert to parquet files. This dataframe has columns which are numpy arrays, but column type is "object".

When I try to do this:

name_function = lambda x: f"data-{x}.parquet"
data_frame[['numpy_array_col1','numpy_array_col2']].to_parquet('train/',name_function=name_function)

I get this error:

ArrowTypeError: ("Expected bytes, got a 'numpy.ndarray' object", 'Conversion failed for column INPUT with type object')

During handling of the above exception, another exception occurred:

# Other errors

Can only convert 1-dimensional array values

I tried using csv's and it worked, but I know its better to use parquet files, and I don't know if reading the csv's will raise logical or syntax error because of the ambiguity of the column types.

data_frame[['numpy_array_col1','numpy_array_col2']].to_csv('train/train-*.csv')

Update: When reading the csv files, the numpy arrays become of type string, so I have to convert them back to numpy arrays.

I also tried

df.to_parquet('somefile', engine='pyarrow')

Same thing happens


Solution

  • I tried this workaround. First you flatten all numpy arrays in dask's dataframe. Then you write to the parquet files specifying a schema with pyArrow's list datatypes. Finally you can read the file, but you have to reshape all the previous arrays to thier original shape.

    import numpy as np
    import pandas as pd
    import dask.dataframe as dd
    import pyarrow as pa
    
    a = np.array([[5, 6], [7, 8]]).flatten()
    b = np.array([[9, 10], [11, 12]]).flatten()
    df = pd.DataFrame({"col1": [a],"col2":[b]})
    df.loc[1] = (b,a)
    
    df = dd.from_pandas(df, npartitions=2)
    
    print(df.compute())
    
    name_function = lambda x: f"data-{x}.parquet"
    df[['col1','col2']].to_parquet('train/',name_function=name_function,engine='pyarrow',schema={"col1": pa.list_(pa.int32()),"col2": pa.list_(pa.int32())})
    
    
    df2 = dd.read_parquet(['train/data-0.parquet','train/data-1.parquet'])  
    print(df2.compute())
    
    def reshape(x):
        if(x == 'foo'):
            return x
        return x.reshape(2,2) 
    
    df2 = df2.applymap(lambda x:reshape(x)) 
    print(df2.compute())
    
                  col1             col2
    0     [5, 6, 7, 8]  [9, 10, 11, 12]
    1  [9, 10, 11, 12]     [5, 6, 7, 8]
                  col1             col2
    0     [5, 6, 7, 8]  [9, 10, 11, 12]
    1  [9, 10, 11, 12]     [5, 6, 7, 8]
                      col1                 col2
    0     [[5, 6], [7, 8]]  [[9, 10], [11, 12]]
    1  [[9, 10], [11, 12]]     [[5, 6], [7, 8]]
    

    I hope there is another more simple solution.