Search code examples
apache-spark-sqlpysparkorc

Pyspark: Read ORC files with new schema


I originally converted csv data to orc format with the following schema. This is a daily pull that occurs.

MySchema = StructType([
    StructField("RetailUnit", StringType()),
    StructField("RetailUnitSysCode", IntegerType())])

About a month in, I ran into some issues with one of the columns and needed to change the type to String like so:

MySchema = StructType([
    StructField("RetailUnit", StringType()),
    StructField("RetailUnitSysCode", StringType())])

Now if I read in the whole dataset and try to show:

alloc = spark.read.orc(f"tables/orc/alloc/")

alloc.select('RetailUnitSysCode').show()

I get a nullpointerexception:

An error occurred while calling o2302.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 168.0 failed 4 times, most recent failure: Lost task 0.3 in stage 168.0 (TID 29923, ip-172-31-45-122.ec2.internal, executor 565): java.lang.NullPointerException
    at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)

Is there a way to read in the orc data with the new schema and fill null values, thus making the datasets usable? Right now if I check the datatypes on my dataframe I have ('RetailUnitSysCode', 'int')


Solution

  • If you have the schema known beforehand, Why don't you read the ORC files in a dataframe and transform with a simple select function call.

    You can try something like this:

    import org.apache.spark.sql.functions.col
    
    alloc = spark.read.orc(f"tables/orc/alloc/")
    
    myschemaDef = [("RetailUnit", "string"),"RetailUnitSysCode", "integer")]
    
    columnExprs = [col(elem[0]).as(elem[1]) for elem in myschemaDef]
    
    transformed_df = alloc.select(*columnExprs)
    

    And this should change your datatypes in the Dataframe. These shall be maintained when you write it back to the persistent storage like HDFS, Hive and so on.

    Note: the "*" in *columnExprs allows us to unpack the list. and is a common python list unpack feature to pass multiple arguments.