Search code examples
scaladataframeapache-sparkuser-defined-functionsapache-spark-dataset

scala spark dataframe modify column with udf return value


I have a spark dataframe which has a timestamp field and i want to convert this to long datatype. I used a UDF and the standalone code works fine but when i plug to to a generic logic where any timestamp will need to be converted i m not ble to get it working.Issue is how can i assing the return value from UDF back to the dataframe column

Below is the code snippet

    val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test3").getOrCreate();
      import org.apache.spark.sql.functions._
      val sqlContext  = spark.sqlContext
      val df2 = sqlContext.jsonRDD(spark.sparkContext.parallelize(Array(
        """{"year":2012, "make": "Tesla", "model": "S", "comment": "No Comment", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
        """{"year":1997, "make": "Ford", "model": "E350", "comment": "Get one", "blank": "","manufacture_ts":"2017-10-16 00:00:00"}""",
      )))

      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

        df2.withColumn("manufacture_ts",getTime(df2("manufacture_ts"))).show

       +-----+----------+-----+--------------+-----+----+
        |     |No Comment|Tesla| 1508126400000|    S|2012|
        |     |   Get one| Ford| 1508126400000| E350|1997|
        |     |          |Chevy| 1508126400000| Volt|2015|
        +-----+----------+-----+--------------+-----+----+ 

    Now i want to invoke this from a dataframe to be clled on all columns which are of type long

    object Test4 extends App{

        val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test").getOrCreate();
        import spark.implicits._

        import scala.collection.JavaConversions._    
        val long : Long  = "1508299200000".toLong    

        val data = Seq(Row("10000020_LUX_OTC",long,"2020-02-14"))

        val schema = List( StructField("rowkey",StringType,true)
                                  ,StructField("order_receipt_dt",LongType,true)
                                  ,StructField("maturity_dt",StringType,true))

        val dataDF =  spark.createDataFrame(spark.sparkContext.parallelize(data),StructType(schema))

        val modifedDf2= schema.foldLeft(dataDF) { case (newDF,StructField(name,dataType,flag,metadata)) =>
          newDF.withColumn(name,DataTypeUtil.transformLong(newDF,name,dataType.typeName))
modifedDf2,show
        }

      }


      val convertTimeStamp = udf { (manTs :java.sql.Timestamp) =>
        manTs.getTime
      }

      def transformLong(dataFrame: DataFrame,name:String, fieldType:String):Column = {
        import org.apache.spark.sql.functions._

        fieldType.toLowerCase match {

          case "timestamp"  => convertTimeStamp(dataFrame(name))
          case _ => dataFrame.col(name)
        }
      }

Solution

  • Maybe your udf crashed if the timestamp is nullYou can do :

    • use unix_timestamp instead of UDF.. or make your UDF null-safe
    • only apply on fields which need to be converted.

    Given the data:

    import spark.implicits._

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types.TimestampType
    
    val df = Seq(
      (1L,Timestamp.valueOf(LocalDateTime.now()),Timestamp.valueOf(LocalDateTime.now()))
    ).toDF("id","ts1","ts2")
    

    you can do:

    val newDF = df.schema.fields.filter(_.dataType == TimestampType).map(_.name)
      .foldLeft(df)((df,field) => df.withColumn(field,unix_timestamp(col(field))))
    
    newDF.show()
    

    which gives:

    +---+----------+----------+
    | id|       ts1|       ts2|
    +---+----------+----------+
    |  1|1589109282|1589109282|
    +---+----------+----------+