Search code examples
scalaapache-sparkdataframeuser-defined-functionsudf

Add column index to dataframe based on another column (user in this case)


I have a dataframe as given below where the last column represents the number of times the user has searched for the location and stay

|  Hanks|         Rotterdam|      airbnb7|                     1|
|Sanders|         Rotterdam|      airbnb2|                     1|
|  Hanks|         Rotterdam|      airbnb2|                     3|
|  Hanks|             Tokyo|      airbnb8|                     2|
|  Larry|             Hanoi|             |                     2|
|  Mango|             Seoul|      airbnb5|                     1|
|  Larry|             Hanoi|      airbnb1|                     2|

which i want to transform as follows

|  Hanks|         Rotterdam|      airbnb7|                     1|    1|
|Sanders|         Rotterdam|      airbnb2|                     1|    1|
|  Hanks|         Rotterdam|      airbnb2|                     3|    2|
|  Hanks|             Tokyo|      airbnb8|                     2|    3|
|  Larry|             Hanoi|             |                     2|    0|
|  Mango|             Seoul|      airbnb5|                     1|    1|
|  Larry|             Hanoi|      airbnb1|                     2|    1|

Notice that column 5 represents the index of the unique combination of options(location+stay) that user selected. eg

|  Hanks|         Rotterdam|      airbnb7|                     1|    1|
|  Hanks|         Rotterdam|      airbnb2|                     3|    2|
|  Hanks|             Tokyo|      airbnb8|                     2|    3|

I tried using groupBy/Agg to do this by implementing a udf function as the following in the agg function.

val df2 = df1.groupBy("User", "clickedDestination", "clickedAirbnb")
                      .agg(indexUserDetailsUDF(col("clickedAirbnb")) as ("clickedAirbnbIndex"))

And the udf as follows

var cnt = 0
val airbnbClickIndex:(String) => String = (airbnb) => {
  if(airbnb== "") "null" //return 0 for airbnbClickIndex when airbnb is empty
  else{cnt+=1; cnt.toString()} //otherwise return incremented value
}
val indexUserDetailsUDF = udf(airbnbClickIndex)

But this is not working. Any input is much appreciated. Thanks.

Update1: Daniel's suggestion of dense_rank does the following to a user

|Meera|         Amsterdam|     airbnb12|         1|     1|
|Meera|         Amsterdam|      airbnb2|         1|     2|
|Meera|         Amsterdam|      airbnb7|         1|     3|
|Meera|         Amsterdam|      airbnb8|         1|     4|
|Meera|         Bangalore|             |         1|     5|
|Meera|         Bangalore|     airbnb11|         1|     6|
|Meera|         Bangalore|      airbnb8|         1|     7|
|Meera|             Hanoi|      airbnb1|         2|     8|
|Meera|             Hanoi|      airbnb2|         1|     9|
|Meera|             Hanoi|      airbnb7|         1|    10|
|Meera|            Mumbai|             |         1|    11|
|Meera|              Oslo|             |         2|    12|
|Meera|              Oslo|      airbnb8|         1|    13|
|Meera|             Paris|             |         1|    14|
|Meera|             Paris|     airbnb11|         1|    15|
|Meera|             Paris|      airbnb6|         1|    16|
|Meera|             Paris|      airbnb7|         1|    17|
|Meera|             Paris|      airbnb8|         2|    18|
|Meera|         Rotterdam|      airbnb2|         1|    19|

I assumed dense_rank will push those records with empty field values (in this case 3rd empty field) to the last. Is this correct?


Solution

  • If I got it right, you probably want a windowed rank. You could try the following:

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.expressions.Window
    
    val window = Window.partitionBy("User").orderBy("User", "clickedDestination", "clickedAirbnb")
    
    val result = df.withColumn("clickedAirbnbIndex", dense_rank().over(window))
    

    If needed, you can find some good reading about window functions in Spark here.

    Also, the functions package api documentation is very useful.