Search code examples
scalaapache-sparkapache-spark-sqlrdd

Spark: map columns of a dataframe to their ID of the distinct elements


I have the following dataframe of two columns of string type A and B:

val df = (
    spark
    .createDataFrame(
        Seq(
            ("a1", "b1"),
            ("a1", "b2"),
            ("a1", "b2"),
            ("a2", "b3")
        )
    )
).toDF("A", "B")

I create maps between distinct elements of each columns and a set of integers

val mapColA = (
    df
    .select("A")
    .distinct
    .rdd
    .zipWithIndex
    .collectAsMap
)

val mapColB = (
    df
    .select("B")
    .distinct
    .rdd
    .zipWithIndex
    .collectAsMap
)

Now I want to create a new columns in the dataframe applying those maps to their correspondent columns. For one map only this would be

df.select("A").map(x=>mapColA.get(x)).show()

However I don't understand how to apply each map to their correspondent columns and create two new columns (e.g. with withColumn). The expected result would be

val result = (
    spark
    .createDataFrame(
        Seq(
            ("a1", "b1", 1, 1),
            ("a1", "b2", 1, 2),
            ("a1", "b2", 1, 2),
            ("a2", "b3", 2, 3)
        )
    )
).toDF("A", "B", "idA", "idB")

Could you help me?


Solution

  • If I understood correctly, this can be achieved using dense_rank:

    import org.apache.spark.sql.expressions.Window
    
    val df2 = df.withColumn("idA", dense_rank().over(Window.orderBy("A")))
                .withColumn("idB", dense_rank().over(Window.orderBy("B")))
    
    df2.show
    +---+---+---+---+
    |  A|  B|idA|idB|
    +---+---+---+---+
    | a1| b1|  1|  1|
    | a1| b2|  1|  2|
    | a1| b2|  1|  2|
    | a2| b3|  2|  3|
    +---+---+---+---+
    

    If you want to stick with your original code, you can make some modifications:

    val mapColA = df.select("A").distinct().rdd.map(r=>r.getAs[String](0)).zipWithIndex.collectAsMap
    
    val mapColB = df.select("B").distinct().rdd.map(r=>r.getAs[String](0)).zipWithIndex.collectAsMap
    
    val df2 = df.map(r => (r.getAs[String](0), r.getAs[String](1), mapColA.get(r.getAs[String](0)), mapColB.get(r.getAs[String](1)))).toDF("A","B", "idA", "idB")
    
    df2.show
    +---+---+---+---+
    |  A|  B|idA|idB|
    +---+---+---+---+
    | a1| b1|  1|  2|
    | a1| b2|  1|  0|
    | a1| b2|  1|  0|
    | a2| b3|  0|  1|
    +---+---+---+---+