Search code examples
pandasapache-sparkpyspark

read parquet dataset in pyspark based on pandas DataFrame with datetime64 datatype


How to avoid org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false)) when reading a Parquet dataset created from a pandas dataframe with a datetime64[ns] column. Here is a minimal example -

import pandas as pd
from pyspark.sql import SparkSession

# pandas DataFrame with datetime64[ns] column
pdf = pd.DataFrame(data={'time': pd.date_range('10/1/23', '10/7/23', freq='D')})
pdf.to_parquet('<path>/data.parquet')

# read parquet dataset - creates Illegal Parquet type
spark = SparkSession.builder.getOrCreate()
sdf = spark.read.parquet('<path>/data.parquet')

# recover original dataframe
df = sdf.toPandas()

The goal is read the parquet dataset and receive the time column as a pyspark TimestampType.

There are workarounds to convert the datetime64[ns] column to object datatype that are not ideal. One such workaroud - pdf['time'] = pd.Series(pdf['time'].dt.to_pydatetime(), dtype=object) - raises FutureWarning: Passing unit-less datetime64 dtype to .astype is deprecated and will raise in a future version. Pass 'datetime64[ns]' instead when coverting the spark dataframe back to pandas dataframe.

Running pandas-1.5.3, pyspark-3.4.1 on Linux instance.


Solution

  • By default, Pandas stores DatetimeIndex under datetime64[ns] (nanoseconds), you must store datetime under datetime64[ms] (milliseconds) so that PySpark can correctly load the parquet file:

    pdf.astype({'time': 'datetime64[ms]'}).to_parquet('<path>/data.parquet')
    
    # Or use pd.date_range('10/1/23', '10/7/23', freq='D', unit='ms')  # <- unit
    

    Output:

    >>> sdf.show()
    +-------------------+
    |               time|
    +-------------------+
    |2023-10-01 00:00:00|
    |2023-10-02 00:00:00|
    |2023-10-03 00:00:00|
    |2023-10-04 00:00:00|
    |2023-10-05 00:00:00|
    |2023-10-06 00:00:00|
    |2023-10-07 00:00:00|
    +-------------------+
    
    >>> sdf.dtypes
    [('time', 'timestamp_ntz')]