Search code examples
scalaapache-sparkapache-spark-sql

Spark scala Dataframe : How can i apply custom type to an existing dataframe?


I have a dataframe (dataDF) which contains data like :

firstColumn;secondColumn;thirdColumn
myText;123;2010-08-12 00:00:00

In my case, all of these columns are StringType.

In the other hand, I have another DataFrame (customTypeDF) which can be modified and contains for some columns a custom type like :

columnName;customType
secondColumn;IntegerType
thirdColumn; TimestampType

How can I apply dynamically the new types on my dataDF dataframe ?


Solution

  • You can map the column names using the customTypeDF collected as a Seq:

    val colTypes = customTypeDF.rdd.map(x => x.toSeq.asInstanceOf[Seq[String]]).collect
    
    val result = dataDF.select(
        dataDF.columns.map(c => 
            if (colTypes.map(_(0)).contains(c)) 
            col(c).cast(colTypes.filter(_(0) == c)(0)(1).toLowerCase.replace("type","")).as(c) 
            else col(c)
        ):_*
    )
    
    result.show
    +-----------+------------+-------------------+
    |firstColumn|secondColumn|        thirdColumn|
    +-----------+------------+-------------------+
    |     myText|         123|2010-08-12 00:00:00|
    +-----------+------------+-------------------+
    
    result.printSchema
    root
     |-- firstColumn: string (nullable = true)
     |-- secondColumn: integer (nullable = true)
     |-- thirdColumn: timestamp (nullable = true)