I want to calculate the total sum of the amounts corresponding to the secondary accounts and compare its value with the primary account. In the following example, the account number that begins with "643" is the primary account and the accounts which appear after that are its secondary. Again another primary account appears which begins with "643" followed by its secondary accounts. I want to group the records that comprises primary and its secondary and compute the sum of the amounts for secondary accounts.
Input: Account, Amount 643100, 10000 ---- primary account 234100, 4000 ---- secondary account 231300, 1000 ---- secondary account 136400, 5000 ---- secondary account 643841, 20000 ---- next group 562100, 10000 432176, 10000 643304, 40000 ---- next group 124562, 20000 234567, 5000
Output: Account, Amount, sumofsecamounts 643100, 10000, 10000 643841, 20000, 20000 643304, 40000, 25000
You have some problems you need to solve.
.
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
spark.sql("create table accounts ( Account int, Amount int, order int)") // I had to add order so my data would land in the format you had.
spark.sql("insert into accounts values ( 643100, 10000,1 ), (234100, 4000,2),(231300, 1000,3),(136400, 5000,4),(643841, 20000,5),(562100, 10000,6),(432176, 10000,7),(643304, 40000,8 ),(124562, 20000,9),(234567, 5000,10)")
val increasing = spark.sql("select * from accounts order by order").withColumn("monotonically_increasing_id", monotonically_increasing_id()) // we need a column to order by a column for windows so this give us an ordered field to use. Link to documentation below.
val winowSpec = Window.partitionBy().orderBy("monotonically_increasing_id")
increasing
.withColumn("is_Primary",
when( col("Account") > 643000, col("Account") )
.otherwise( 0 ) // This identifies primary & secondary accounts
).withColumn("partition",
sum("is_Primary").over(winowSpec) // rolling sum Trick that partitions data by nature of the fact 0's denote secondary.
).groupBy(
col("partition"), //groups primary and secondary
col("is_Primary") //splits primary vs secondary totals.
).agg( sum("Amount") ).show()
+---------+----------+-----------+
|partition|is_Primary|sum(Amount)|
+---------+----------+-----------+
| 643100| 643100| 10000|
| 643100| 0| 10000|
| 1286941| 643841| 20000|
| 1286941| 0| 20000|
| 1930245| 643304| 40000|
| 1930245| 0| 25000|
+---------+----------+-----------+
Once you have this you can easily self join the table if you really want the rows as you described.