Search code examples
csvapache-sparkpysparkapache-spark-sql

Read csv files via spark with changing column order


Is it possible to define a schema to read csv files via spark, where the column order changes between files

I.e. first file

A,B,C
1,2,3
2,3,4

second file

B,C,A
1,2,3
2,3,4
fSchema = StructType([
    StructField("A", IntegerType(), True),
    StructField("B", IntegerType(), True),
    StructField("C", IntegerType(), True)
])

spark.read.format('csv').option("header", "true").schema(fSchema).load('file1.csv') returns

A B C
1 2 3
2 3 4
spark.read.format('csv').option("header", "true").schema(fSchema).load('file2.csv')
A B C
1 2 3
2 3 4

when I was expecting for the header column order to be understood due to the schema definition and re-order the data columns

A B C
3 1 2
4 2 3

Solution

  • By default, when you apply a schema to a CSV it will use the order of columns and not the actual column names, if you want to match the column names you can use the below function:

    def apply_schema_to_dataframe(df, schema):
        for field in schema.fields:
            df = df.withColumn(field.name, col(field.name).cast(field.dataType))
        return df
    

    Here's how to use it:

    spark = SparkSession.builder.master("local[*]").getOrCreate()
    
    df1 = spark.createDataFrame([(1, 2, 3), (2, 3, 4)], ["A", "B", "C"])
    df2 = spark.createDataFrame([(1, 2, 3), (2, 3, 4)], ["B", "C", "A"])
    
    fSchema = StructType([
        StructField("A", IntegerType(), True),
        StructField("B", IntegerType(), True),
        StructField("C", IntegerType(), True)
    ])
    
    apply_schema_to_dataframe(df1, fSchema).show()
    apply_schema_to_dataframe(df2, fSchema).show()
    

    The result is:

    +---+---+---+
    |  A|  B|  C|
    +---+---+---+
    |  1|  2|  3|
    |  2|  3|  4|
    +---+---+---+
    
    +---+---+---+
    |  B|  C|  A|
    +---+---+---+
    |  1|  2|  3|
    |  2|  3|  4|
    +---+---+---+