Im using the following code in order to re-shape a dataframe using its rows for this reshaping.
The dataframe contains the date of when a product changes its ID, but in order to join it with a huge other dataframe containing the transactions, I need a new column which defines the valid ID range.
For instance, if product A changes to be product B effective day 01/01, and then changes to be product C effective 03/01, I will need the start date and end date in the same row so I can join it with the huge transactions dataframe filtering by the dates that the product is effectively B (or C), so I can properly rename the products to their effective real ID.
Another piece of information, df_MPC is about 800 lines, and will not grow much more.
So the approach I am trying (which works when run in the development environment) is to foldleft.
The summarized version of the MPC dataframe would be:
Product | Date | NewProd
A | 01/01/2018| B
B | 03/01/2018| C
Objective:
Product | Date | NewProd | OriginalProd | EndDate
A | 01/01/2018| B | A | 03/01
B | 03/01/2018| C | A | 31/12-9999
(OriginalProd column being neccesary for the final join with the transactions dataframe)
And the code resulting in stackoverflow is as follows:
var rowList = new ListBuffer[Row]()
val it = df_MPC_SOURCE.toLocalIterator()
while (it.hasNext) { rowList += it.next()}
val df_MPC_TRANSFORMED = rowList.reverse
.foldLeft(df_MPC_pre_edit_source: DataFrame)((acc, elem) => acc
.withColumn("EndDate",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("N_PRODUCT_ID_NEW") === elem.getAs("N_PRODUCT_ID")),
elem.getAs("D_EFFECTIVE_CHANGE"))
.otherwise(col("EndDate")))
.withColumn("OriginalProd",
when((col("N_DISTRIBUTOR_CODE") === elem.getAs("N_DISTRIBUTOR_CODE"))
&& col("N_CONTRACT_CODE") === elem.getAs("N_CONTRACT_CODE")
&& (col("MPC_original") === elem.getAs("N_PRODUCT_ID_NEW")),
elem.getAs("N_PRODUCT_ID"))
.otherwise(col("OriginalProd")))
)
This code transforms the source dataframe (example provided above) into the objective dataframe (example above too).
It does so by iterating through all its 800 lines in a sorted fashion (datewise) and for each of its rows:
And the error thrown when this code is used in the cluster:
Exception in thread "main" java.lang.StackOverflowError
at scala.collection.GenSetLike$class.apply(GenSetLike.scala:44)
at scala.collection.AbstractSet.apply(Set.scala:47)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:334)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
How could I make this code work in the cluster same way it works properly locally? Thank you!
It took me a while to figure out what you were trying to do. I think you can do the same with a simpler approach.
This doesn't explain why your code doesn't work but your foldleft can be replace with a spark sql query, something like this:
df_MPC_SOURCE.registerTempTable("mpc_source")
val test = sqlContext.sql(
"""select c1.N_PRODUCT_ID,c1.D_EFFECTIVE_CHANGE,c1.N_PRODUCT_ID_NEW,
|coalesce(c2.D_EFFECTIVE_CHANGE,c1.MPC_endDate) as MPC_endDate,
|coalesce(c3.N_PRODUCT_ID,c1.MPC_original) as MPC_original
|from mpc_source c1
|left join mpc_source c2 on c1.N_DISTRIBUTOR_CODE=c2.N_DISTRIBUTOR_CODE
|and c1.N_CONTRACT_CODE=c2.N_CONTRACT_CODE
|and c1.N_PRODUCT_ID_NEW=c2.N_PRODUCT_ID
|left join mpc_source c3 on c1.N_DISTRIBUTOR_CODE=c3.N_DISTRIBUTOR_CODE
|and c1.N_CONTRACT_CODE=c3.N_CONTRACT_CODE
|and c1.MPC_original = c3.N_PRODUCT_ID_NEW
""".stripMargin)
Hope this helps you.