Search code examples
pysparkjupyteruser-defined-functionsamazon-emr

pandas udf showString error on simplistic example


I'm starting to work with pandas udf on a Pyspark Jupyter notebook running on an EMR cluster using this 'identity' pandas udf and I'm getting the following error:

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
# Input/output are both a pandas.DataFrame
def pudf(pdf):

    return pdf

df.filter(df.corp_cust=='LO').groupby('corp_cust').apply(pudf).show()

An error occurred while calling o388.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 113.0 failed 4 times, most recent failure: Lost task 0.3 in stage 113.0 (TID 1666, ip-10-23-226-64.us.scottsco.com, executor 1): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)

I can run df.filter(df.corp_cust=='LO').show() with success so this makes me think things are 'braking' somewhere in translation from pandas to pyspark dataframe.

This dataframe has a couple StringType and DecimalType columns. I've also tried encoding the string columns to 'utf-8' within the udf and get the same error.

Any suggestion on how to fix this?


Solution

  • This is apparently an issue[1] with pyarrow version 0.15 that causes pandads udf to through error. you can try to change version by installing Pyarrow 0.14.1 or lower.

      sc.install_pypi_package("pyarrow==0.14.1") 
    

    [1]https://issues.apache.org/jira/browse/SPARK-29367