Search code examples
rscalaapache-sparkmachine-learningforecasting

How can I calculate the correlation of my residuals ? Spark-Scala


I need to know if my residuals are correlated or not. I didn't find a way to do it using Spark-Scala on Databricks. And i conclude that i should export my project to R to use acf function.

Does someone know a trick to do it using Spark-Scala on Databricks ?

For those who need more information : I'm currently working on Sales Forecasting. I used a Regression Forest using different features. Then, I need to evaluate the quality of my forecast. To check this, i read on this paper that residuals were a good way to see if your forecasting model is good or bad. In any cases, you can still improve it but it's just to make my opinion on my forecast model and compared it to others models.

Currently, I have one dataframe like the one below. It's a part of the testing data/out-of-sample data. (I casted prediction and residuals to IntegerType, that's why at the 3rd row 40 - 17 = 22) enter image description here


Solution

  • I am using Spark 2.1.1.

    You can find correlation between columns using spark ml library function

    Lets first import the classes.

    import org.apache.spark.sql.functions.corr
    import org.apache.spark.mllib.linalg._
    import org.apache.spark.mllib.stat.Statistics
    

    Now prepare the input DataFrame :

    scala> val seqRow = Seq(
         |     ("2017-04-27",13,21),
         |     ("2017-04-26",7,16),
         |     ("2017-04-25",40,17),
         |     ("2017-04-24",17,17),
         |     ("2017-04-21",10,20),
         |     ("2017-04-20",9,19),
         |     ("2017-04-19",30,30),
         |     ("2017-04-18",18,25),
         |     ("2017-04-14",32,28),
         |     ("2017-04-13",39,18),
         |     ("2017-04-12",2,4),
         |     ("2017-04-11",8,24),
         |     ("2017-04-10",18,27),
         |     ("2017-04-07",6,17),
         |     ("2017-04-06",13,29),
         |     ("2017-04-05",10,17),
         |     ("2017-04-04",6,8),
         |     ("2017-04-03",20,32)
         | )
    seqRow: Seq[(String, Int, Int)] = List((2017-04-27,13,21), (2017-04-26,7,16), (2017-04-25,40,17), (2017-04-24,17,17), (2017-04-21,10,20), (2017-04-20,9,19), (2017-04-19,30,30), (2017-04-18,18,25), (2017-04-14,32,28), (2017-04-13,39,18), (2017-04-12,2,4), (2017-04-11,8,24), (2017-04-10,18,27), (2017-04-07,6,17), (2017-04-06,13,29), (2017-04-05,10,17), (2017-04-04,6,8), (2017-04-03,20,32))
    
    scala> val rdd = sc.parallelize(seqRow)
    rdd: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[51] at parallelize at <console>:34
    
    scala> val input_df = spark.createDataFrame(rdd).toDF("date","amount","prediction").withColumn("residuals",'amount - 'prediction)
    input_df: org.apache.spark.sql.DataFrame = [date: string, amount: int ... 2 more fields]
    
    scala> input_df.show(false)
    +----------+------+----------+---------+
    |date      |amount|prediction|residuals|
    +----------+------+----------+---------+
    |2017-04-27|13    |21        |-8       |
    |2017-04-26|7     |16        |-9       |
    |2017-04-25|40    |17        |23       |
    |2017-04-24|17    |17        |0        |
    |2017-04-21|10    |20        |-10      |
    |2017-04-20|9     |19        |-10      |
    |2017-04-19|30    |30        |0        |
    |2017-04-18|18    |25        |-7       |
    |2017-04-14|32    |28        |4        |
    |2017-04-13|39    |18        |21       |
    |2017-04-12|2     |4         |-2       |
    |2017-04-11|8     |24        |-16      |
    |2017-04-10|18    |27        |-9       |
    |2017-04-07|6     |17        |-11      |
    |2017-04-06|13    |29        |-16      |
    |2017-04-05|10    |17        |-7       |
    |2017-04-04|6     |8         |-2       |
    |2017-04-03|20    |32        |-12      |
    +----------+------+----------+---------+
    

    The values of residuals for row 2017-04-14 and 2017-04-13 don't match as i am subtracting amount - prediction for residuals

    Now proceeding forward to calculate correlation between all the columns. This method is used for calculating correlation if number of columns are more and you need correlation between each column to others.

    First we drop the column whose correlation is not to be calculated

    scala> val drop_date_df = input_df.drop('date)
    drop_date_df: org.apache.spark.sql.DataFrame = [amount: int, prediction: int ... 1 more field]
    
    scala> drop_date_df.show
    +------+----------+---------+
    |amount|prediction|residuals|
    +------+----------+---------+
    |    13|        21|       -8|
    |     7|        16|       -9|
    |    40|        17|       23|
    |    17|        17|        0|
    |    10|        20|      -10|
    |     9|        19|      -10|
    |    30|        30|        0|
    |    18|        25|       -7|
    |    32|        28|        4|
    |    39|        18|       21|
    |     2|         4|       -2|
    |     8|        24|      -16|
    |    18|        27|       -9|
    |     6|        17|      -11|
    |    13|        29|      -16|
    |    10|        17|       -7|
    |     6|         8|       -2|
    |    20|        32|      -12|
    +------+----------+---------+
    

    Since there are more than 2 column for correlation, we need to find correlation matrix. For calculating correlation matrix we need RDD[Vector] as you can see in spark example for correlation.

    scala> val dense_rdd = drop_date_df.rdd.map{row =>
         |         val first = row.getAs[Integer]("amount")
         |         val second = row.getAs[Integer]("prediction")
         |         val third = row.getAs[Integer]("residuals")
         |         Vectors.dense(first.toDouble,second.toDouble,third.toDouble)}
    dense_rdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[62] at map at <console>:40
    
    scala> val correlMatrix: Matrix = Statistics.corr(dense_rdd, "pearson")
    correlMatrix: org.apache.spark.mllib.linalg.Matrix =
    1.0                  0.40467032516705076  0.782939330961529
    0.40467032516705076  1.0                  -0.2520531290688281
    0.782939330961529    -0.2520531290688281  1.0
    

    The order of column remains same but you loose out the column names. You can find good resources about structure of correlation matrix.

    Since you want to find the correlation of residuals with other two columns. We can explore other options

    Hive corr UDAF

    scala> drop_date_df.createOrReplaceTempView("temp_table")
    
    scala> val corr_query_df = spark.sql("select corr(amount,residuals) as amount_residuals_corr,corr(prediction,residuals) as prediction_residual_corr from temp_table")
    corr_query_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
    
    scala> corr_query_df.show
    +---------------------+------------------------+
    |amount_residuals_corr|prediction_residual_corr|
    +---------------------+------------------------+
    |   0.7829393309615287|      -0.252053129068828|
    +---------------------+------------------------+
    

    Spark corr function link

    scala> val corr_df = drop_date_df.select(
         |                 corr('amount,'residuals).as("amount_residuals_corr"),
         |                 corr('prediction,'residuals).as("prediction_residual_corr"))
    corr_df: org.apache.spark.sql.DataFrame = [amount_residuals_corr: double, prediction_residual_corr: double]
    
    scala> corr_df.show
    +---------------------+------------------------+
    |amount_residuals_corr|prediction_residual_corr|
    +---------------------+------------------------+
    |   0.7829393309615287|      -0.252053129068828|
    +---------------------+------------------------+