Search code examples
scalaapache-spark-sqlarraybuffer

Converting Scala mutable arrays to a spark dataframe


I have three mutable arrays defined as:

import scala.collection.mutable.ArrayBuffer
var quartile_1 = ArrayBuffer[Double]()
var quartile_3 = ArrayBuffer[Double]()
var id = ArrayBuffer[String]()

quartile_1 and quartile_3 are information at id level and I am currently computing them as:

def func1(x: org.apache.spark.sql.Row) {
  val apQuantile = df_auth_for_qnt.where($"id" === x(0).toString).stat.approxQuantile("tran_amt", Array(0.25, 0.75), 0.001)
  quartile_1 += apQuantile(0)
  quartile_3 += apQuantile(1)
  id += x(0).toString()
}

val cardNumList = df_auth_for_qnt_gb.where($"tran_cnt" > 8).select("card_num_1").collect.foreach(func1)

Is there a better approach than appending them to mutable arrays? My goal is to have the quantile data, id available as a dataframe - so that I can do further joins.


Solution

  • Mutable structures like ArrayBuffer are evil, especially in parallelizable context. Here they can be avoided quite easily.

    func1 can return a tuple of (String, Array[Double]), where the first element corresponds to the id (former id buffer) and the second element is the quartiles returned from approxQuantile:

    def func1(x: Row): (String, Array[Double]) = {
      val cardNum1 = x(0).toString
      val quartiles = df_auth_for_qnt.where($"id" === cardNum1).stat.approxQuantile("tran_amt", Array(0.25, 0.75), 0.001)
      (cardNum1, quartiles)
    }
    

    Now, using functional chaning we can obtain an immutable result structure.

    As a DataFrame:

    val resultDf = df_auth_for_qnt_gb.where($"tran_cnt" > 8).select("card_num_1").map(func1).toDF("id", "quartiles")
    

    Or as a Map[String, Array[Double]] with same associations as in the tuples returned from func1:

    val resultMap = df_auth_for_qnt_gb.where($"tran_cnt" > 8).select("card_num_1").map(func1).collect().toMap