Search code examples
scalaapache-sparkstack-overflowfoldleft

Spark Scala FoldLeft resulting in StackOverflow when run in the cluster


Im using the following code in order to re-shape a dataframe using its rows for this reshaping.

The dataframe contains the date of when a product changes its ID, but in order to join it with a huge other dataframe containing the transactions, I need a new column which defines the valid ID range.

For instance, if product A changes to be product B effective day 01/01, and then changes to be product C effective 03/01, I will need the start date and end date in the same row so I can join it with the huge transactions dataframe filtering by the dates that the product is effectively B (or C), so I can properly rename the products to their effective real ID.

Another piece of information, df_MPC is about 800 lines, and will not grow much more.

So the approach I am trying (which works when run in the development environment) is to foldleft.

The summarized version of the MPC dataframe would be:

Product | Date      | NewProd
A       | 01/01/2018| B
B       | 03/01/2018| C

Objective:

Product | Date      | NewProd | OriginalProd | EndDate
A       | 01/01/2018| B       | A            | 03/01
B       | 03/01/2018| C       | A            | 31/12-9999

(OriginalProd column being neccesary for the final join with the transactions dataframe)

And the code resulting in stackoverflow is as follows:

var rowList = new ListBuffer[Row]()
val it = df_MPC_SOURCE.toLocalIterator()
while (it.hasNext) { rowList += it.next()}

val df_MPC_TRANSFORMED = rowList.reverse
  .foldLeft(df_MPC_pre_edit_source: DataFrame)((acc, elem) => acc
    .withColumn("EndDate",
      when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
        && col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
        && (col("N_PRODUCT_ID_NEW") === elem.getAs("N_PRODUCT_ID")),
        elem.getAs("D_EFFECTIVE_CHANGE"))
        .otherwise(col("EndDate")))
    .withColumn("OriginalProd",
      when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
        && col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
        && (col("MPC_original") === elem.getAs("N_PRODUCT_ID_NEW")),
        elem.getAs("N_PRODUCT_ID"))
        .otherwise(col("OriginalProd")))
  )

This code transforms the source dataframe (example provided above) into the objective dataframe (example above too).

It does so by iterating through all its 800 lines in a sorted fashion (datewise) and for each of its rows:

  • Change the valid date for all the products that match that given line
  • Update the original product ID in the case we find an intermediate product. For instance, if we have a product that swaps from ID "A" to "B" and from "B" to "C" later in time, we will need the column with the original product ID ("A" in this case) for been able to join our result with the original transactions table, which will only contain the product ID "A".

And the error thrown when this code is used in the cluster:

Exception in thread "main" java.lang.StackOverflowError
        at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
        at scala.collection.AbstractSet.apply(Set.scala:47)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:334)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)

How could I make this code work in the cluster same way it works properly locally? Thank you!


Solution

  • It took me a while to figure out what you were trying to do. I think you can do the same with a simpler approach.

    This doesn't explain why your code doesn't work but your foldleft can be replace with a spark sql query, something like this:

    df_MPC_SOURCE.registerTempTable("mpc_source")
    
    val test = sqlContext.sql(
      """select c1.N_PRODUCT_ID,c1.D_EFFECTIVE_CHANGE,c1.N_PRODUCT_ID_NEW,
        |coalesce(c2.D_EFFECTIVE_CHANGE,c1.MPC_endDate) as MPC_endDate,
        |coalesce(c3.N_PRODUCT_ID,c1.MPC_original) as MPC_original
        |from mpc_source c1
        |left join mpc_source c2 on c1.N_DISTRIBUTOR_CODE=c2.N_DISTRIBUTOR_CODE
        |and c1.N_CONTRACT_CODE=c2.N_CONTRACT_CODE
        |and c1.N_PRODUCT_ID_NEW=c2.N_PRODUCT_ID
        |left join mpc_source c3 on c1.N_DISTRIBUTOR_CODE=c3.N_DISTRIBUTOR_CODE
        |and c1.N_CONTRACT_CODE=c3.N_CONTRACT_CODE
        |and c1.MPC_original = c3.N_PRODUCT_ID_NEW
      """.stripMargin)
    

    Hope this helps you.